# coding:utf-8
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2020 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
from __future__ import absolute_import
from __future__ import print_function
from lvemanager.helpers import exit_with_error, run_command
import json
from abc import abstractmethod
from clcommon.lib.whmapi_lib import WhmApiRequest, WhmApiError
import sys
OWNER_ADMIN = 'admin'
OWNER_USER = 'user'
UAPI_IGNORED_ERRORS = ["The event UAPI::LangPHP::php_set_vhost_versions was handled successfully."]
def get_cpanel_api_class(owner):
"""
Factory method that returns
certain class depending on
owner's type
:param owner: Can be 'admin' or 'user'
:return:
"""
if owner == OWNER_ADMIN:
return CPanelAdminApi()
return CPanelUserApi()
class CPanelApi(object):
"""
Abstract method that defines abstract methods
that must be implemented and common methods of
the derived classes
"""
RETURN_JSON = '--output=json'
ALLOWED_OPERATIONS = {}
@abstractmethod
def check_operation_allowed(self, called_method):
"""
Checks whether operation is allowed to perform
:param called_method:
:return:
"""
raise NotImplementedError('Method check_operation_allowed must be implemented!')
@abstractmethod
def prepare_running_command(self, called_method, params, return_json):
"""
Depending on the passed arguments,
the method builds running command
"""
raise NotImplementedError('Method prepare_running_command must be implemented!')
@abstractmethod
def parse_result(self, called_method, result):
"""
Some results are parsed before sending back
to the called. This method parses it and returns
transformed result.
"""
raise NotImplementedError('Method parse_result must be implemented!')
@staticmethod
def check_method_allowed(method_name, methods):
"""
CPanel API contains several methods and
not all of them if supported by this utility.
"""
for method_object in methods:
if method_name == method_object['method']:
return True
return False
def check_for_errors(self, result):
"""
Checks whether CPanel API returned
error or not
:param result:
:return:
"""
raise NotImplementedError('Method check_for_errors must be implemented!')
@staticmethod
def get_method_parser(method_name, methods):
"""
Some result must be parsed before
they are sent back to the caller. This
method extracts the method that must be executed
upon the result to transform it.
"""
for method_object in methods:
if method_name == method_object['method']:
return method_object.get('parser', None)
def get_ignore_errors(self, method_name, methods):
"""
return ignore error flag for method
:return:
"""
for method_object in methods:
if method_name == method_object['method']:
return method_object.get('ignore_errors', None)
@staticmethod
def parse_vhost_versions(result):
"""
Method parses the result of the method 'parse_vhost_versions'.
The host will be inherited when its field 'sys_default'
inside 'php_version_source' is equal to 1.
"""
parsed_result = []
for r in result:
parsed_result.append({
'version': r.get('version'),
'host': r.get('vhost'),
'php_fpm': True if r.get('php_fpm') else False,
'inherited': True if r.get('php_version_source', {}).get('sys_default') == 1 else False
})
return parsed_result
def run(self, called_method, params, return_json=True):
"""
Default method which first checks whether operation
is allowed then if allowed runs prepared command.
After receiving the result, it will call parse_result method
to transform result into desired format.
"""
if self.check_operation_allowed(called_method):
prepared_command = self.prepare_running_command(called_method, params, return_json)
code, output, std_err = run_command(prepared_command, return_full_output=True)
if code != 0:
exit_with_error(std_err or 'output of the command: %s\n%s' % (prepared_command, output))
try:
result = json.loads(output)
return self.parse_result(called_method, result)
except ValueError as e:
exit_with_error(e)
else:
exit_with_error('Not allowed operation: {}'.format(called_method))
class CPanelAdminApi(CPanelApi):
COMMAND = '/usr/local/cpanel/bin/whmapi1'
ALLOWED_OPERATIONS = [
{
'method': 'php_get_vhost_versions',
'parser': lambda result: CPanelAdminApi.parse_vhost_versions(result)
},
{'method': 'php_get_system_default_version'},
{'method': 'php_set_vhost_versions'},
{'method': 'php_get_installed_versions'}
]
def check_operation_allowed(self, called_method):
return self.check_method_allowed(called_method, self.ALLOWED_OPERATIONS)
def prepare_running_command(self, called_method, params, return_json):
transformed_params = {}
for param in params:
key, value = param.split('=')
transformed_params[key] = value
return transformed_params
def parse_result(self, called_method, result):
parser = self.get_method_parser(called_method, self.ALLOWED_OPERATIONS)
if parser is not None:
return parser(result)
return result
@staticmethod
def parse_vhost_versions(result):
parsed_result = []
for r in result['versions']:
parsed_result.append({
'version': r.get('version'),
'host': r.get('vhost'),
'user': r.get('account'),
'php_fpm': True if r.get('php_fpm') else False,
'inherited': True if r.get('php_version_source', {}).get('sys_default') == 1 else False
})
return parsed_result
def run(self, called_method, params, return_json=True):
params = self.prepare_running_command(called_method, params, return_json)
if self.check_operation_allowed(called_method):
try:
return self.parse_result(called_method, WhmApiRequest(called_method).with_arguments(**params).call())
except WhmApiError as e:
print(e)
sys.exit(1)
else:
exit_with_error('Not allowed operation: {}'.format(called_method))
class CPanelUserApi(CPanelApi):
COMMAND = '/usr/bin/uapi'
ALLOWED_OPERATIONS = {
'LangPHP': [
{'method': 'php_set_vhost_versions'},
{
'method': 'php_get_installed_versions',
'ignore_errors': True,
},
{
'method': 'php_get_vhost_versions',
'parser': lambda result: CPanelApi.parse_vhost_versions(result),
'ignore_errors': True,
},
{
'method': 'php_get_system_default_version',
'ignore_errors': True,
}
]
}
def check_class_allowed(self, class_name):
"""
CPanel UAPI requires to pass class name and
only few of classes are supported by this utility.
"""
return class_name in self.ALLOWED_OPERATIONS.keys()
def check_operation_allowed(self, called_method):
"""
Checks whether operation is allowed or not.
By CPanel UAPI supports several operations
but only few of them can be called via this utility.
"""
class_name, method_name = self.extract_class_and_method(called_method)
if self.check_class_allowed(class_name):
return self.check_method_allowed(method_name, self.ALLOWED_OPERATIONS[class_name])
return False
@staticmethod
def extract_class_and_method(called_method):
try:
class_name, method_name = called_method.split('::')
return class_name, method_name
except ValueError:
exit_with_error('Invalid method is passed: {}'.format(called_method))
def prepare_running_command(self, called_method, params, return_json):
class_name, method_name = self.extract_class_and_method(called_method)
params = [self.COMMAND, class_name, method_name] + params
if return_json:
params.append(self.RETURN_JSON)
return params
def parse_result(self, called_method, result):
class_name, method_name = self.extract_class_and_method(called_method)
ignore_errors = self.get_ignore_errors(method_name, self.ALLOWED_OPERATIONS[class_name])
self.check_for_errors(result, ignore_errors)
parser = self.get_method_parser(method_name, self.ALLOWED_OPERATIONS[class_name])
if parser is not None:
return parser(result['result']['data'])
return result['result']['data']
def check_for_errors(self, result, ignore_errors = False):
errors = result['result'].get('errors')
if not errors:
return
for error in errors:
# TODO: The next line is workaround of cPanel issue CPANEL-35122
# see https://support.cpanel.net/hc/en-us/articles/1500004317181-PHP-Selector-change-to-CloudLinux-PHP-version-reports-a-false-error
if error in UAPI_IGNORED_ERRORS:
continue
exit_with_error(' '.join(errors), ignore_errors=ignore_errors)