# coding:utf-8
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import copy
import sys
import json
import argparse
import socket
import base64
import os
import subprocess
import re
from enum import Enum
from dataclasses import dataclass
from typing import Callable, List, Optional
from past.builtins import basestring, unicode # noqa
from future.utils import iteritems
from clcommon.utils import silence_stdout_until_process_exit, get_cl_version
from cllicense import CloudlinuxLicenseLib
from cpanel_api import get_cpanel_api_class
from clcommon.lib.cledition import is_cl_solo_edition, is_container
from clcommon.cpapi import is_hitting_max_accounts_limit, get_main_username_by_uid
LVEMANAGER_PLUGIN_NAMES = {
'python_selector': 'Python Selector',
'nodejs_selector': 'Node.js Selector',
'php_selector': 'PHP Selector',
'resource_usage': 'Resource Usage',
'wpos': 'AccelerateWP'
}
PASSENGER_DEPEND_PLUGINS = ['python_selector', 'nodejs_selector']
DEFAULT_PLUGIN_NAME = 'CloudLinux Manager'
CAGEFS_ENTER_PROXIED_BIN = '/usr/bin/cagefs_enter.proxied'
if not os.path.exists(CAGEFS_ENTER_PROXIED_BIN):
CAGEFS_ENTER_PROXIED_BIN = '/bin/cagefs_enter.proxied'
def is_json(data):
try:
json.loads(data)
return True
except ValueError as error:
return False
class CloudlinuxCliBase(object):
request_data = {}
result = None
available_request_params = [
'owner', 'command', 'method', 'params', 'user_info', 'mockJson', 'attachments', 'plugin_name', 'lang'
]
NOT_FLAGGED_PARAMS = ['config-files', 'content', 'passenger-log-file', 'ignore-list', 'wp-path', 'upgrade-url']
license_is_checked = False
current_plugin_name = ''
licence = CloudlinuxLicenseLib()
def __init__(self):
self.skip_cagefs_check = False
self.user_info = {}
self.parsing_request_data()
self.check_xss()
self.drop_permission()
self.command_methods = {
'spa-ping': self.spa_ping,
'cloudlinux-top': self.cl_top,
'cloudlinux-selector': self.cl_selector,
'cloudlinux-statistics': self.cl_statistics,
'cloudlinux-charts': self.cl_chart,
'cloudlinux-quota': self.cl_quota,
'cpanel-api': self.cpanel_api,
'cloudlinux-xray-user-manager': self.cl_xray_user_manager,
'cloudlinux-statsnotifier': self.cl_statsnotifier,
'cloudlinux-awp-user': self.cloudlinux_awp_user,
'cl-smart-advice-user': self.cl_smart_advice_user,
'cl-install-plugin': self.cl_install_plugin
}
def check_xss(self):
for key in self.request_data.keys():
if key not in self.available_request_params:
self.exit_with_error('BAD REQUEST 1:' + key)
for name, val in iteritems(self.request_data):
if isinstance(val, dict): # if post key is "params"
for key, inner_value in iteritems(val):
self.check_param_key(key)
if self.request_data['command'] == 'cloudlinux-packages' \
and name == 'params' \
and key == 'package':
self.request_data[name][key] = self.escape_param_value(inner_value)
elif self.request_data['command'] == 'cloudlinux-support':
pass
elif self.request_data['command'] == 'cloudlinux-selector' \
and name == 'params' \
and key == 'options':
pass
elif self.request_data['command'] == 'lvectl' \
and name == 'params' \
and key == 'stdin':
pass
elif self.request_data['command'] == 'cloudlinux-selector' \
and name == 'params' \
and key == 'env-vars':
pass
elif self.request_data['command'] == 'cloudlinux-xray-manager' \
and name == 'params' \
and key == 'url':
pass
elif self.request_data['command'] == 'cloudlinux-xray-user-manager' \
and name == 'params' \
and key == 'url':
pass
elif self.request_data['command'] == 'wmt-api' \
and name == 'params' \
and key == 'config-change':
pass
elif self.request_data['command'] == 'cloudlinux-xray-manager' \
and name == 'params' \
and key == 'email':
pass
elif self.request_data['command'] == 'cloudlinux-awp-admin' \
and name == 'params' \
and key == 'upgrade-url':
pass
else:
self.check_param_value(inner_value)
else:
self.check_param_value(val)
def get_env(self):
"""
Get env for subprocess call
"""
env_copy = os.environ.copy()
if self.request_data.get('lang'):
lang = self.request_data.get('lang')
if not re.match(r'^[a-z]{2}$', lang):
lang = 'en'
env_copy['LC_ALL'] = lang
return env_copy
def get_server_ip(self):
"""
Get the server's IP address.
"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
return s.getsockname()[0]
except Exception as e:
return None
def check_param_key(self, key):
if not re.search('^[\w\-]+$', key):
self.exit_with_error('BAD REQUEST 2')
def check_param_value(self, val):
if isinstance(val, basestring):
if re.search('[`\|\$;&\n]', val, re.M):
self.exit_with_error('BAD REQUEST 3')
def escape_param_value(self, val):
chars = "\\\"\'"
for c in chars:
val = val.replace(c, "\\" + c)
return val
def main(self):
command = self.request_data['command']
endpoint = self.command_methods.get(command)
allowed_methods = ['cloudlinux-license', 'external-info', 'spa-get-user-info'] # not requires license check
if endpoint:
if not self.license_is_checked and command not in allowed_methods:
self.check_license()
if 'mockJson' in self.request_data:
self.spa_mock(self.request_data['mockJson'])
endpoint()
else:
if command:
self.exit_with_error("No such module " + command)
else:
self.exit_with_error("Command not defined")
def parsing_request_data(self):
"""
parsing entry data, encode it from base64 to dictionary
:return:
"""
parser = argparse.ArgumentParser()
parser.add_argument('--data')
parser.add_argument('--skip-cagefs-check', action='store_true', default=False)
try:
arguments = parser.parse_args()
except:
self.exit_with_error("Unknown param in request")
if arguments.data:
data_in_base64 = arguments.data
data_in_json = base64.b64decode(data_in_base64).decode("utf-8")
try:
self.request_data = json.loads(data_in_json)
self.skip_cagefs_check = arguments.skip_cagefs_check
except ValueError:
self.exit_with_error("Need json-array")
self.user_info = self.get_user_info()
self.define_current_plugin()
else:
self.exit_with_error("No --data param in request")
def get_user_info(self):
user_info = self.request_data.get('user_info') or {}
if self.request_data['owner'] == 'user' and any(value is None for value in user_info.values()):
euid = os.geteuid()
username = get_main_username_by_uid(euid)
user_info = {'username': username, 'lve-id': euid}
return user_info
def cl_top(self):
# This imports from other package (cagefs), so we turn off pylint import checker for this line
from lvestats.lib.info.cloudlinux_top import CloudLinuxTop #pylint: disable=E0401
import lvestats.lib.config as config #pylint: disable=E0401
list_to_request = self.prepair_params_for_command()
result = ''
try:
result, exitcode = CloudLinuxTop(config.read_config()).main(*list_to_request)
except config.ConfigError as ce:
ce.log_and_exit()
self.exit_with_error(str(ce))
if self.request_data.get('owner') == 'user':
json_result = {}
try:
json_result = json.loads(result)
except:
self.exit_with_error(result)
if json_result.get('result') != 'success':
self.exit_with_error(json_result.get('result'), json_result.get('context'), ignore_errors=True)
print(result)
silence_stdout_until_process_exit()
sys.exit(exitcode)
def cl_quota(self):
list_to_request = self.prepair_params_for_command()
result = self.run_util('/usr/bin/cl-quota', *list_to_request, ignore_errors=True)
print(result)
def cl_xray_user_manager(self):
list_to_request = self.prepair_params_for_command()
list_to_request.remove("--json")
result = self.run_util('/opt/alt/php-xray/cloudlinux-xray-user-manager', *list_to_request, ignore_errors=False)
print(result)
def cl_smart_advice_user(self):
cli_command = '/opt/alt/php-xray/cl-smart-advice-user'
list_to_request = self.prepair_params_for_command(with_json=False)
# Workaround to run the command in background
if '--async' in list_to_request:
subprocess.Popen([cli_command, *list_to_request], stdin=subprocess.PIPE,stdout=subprocess.PIPE,
stderr=subprocess.PIPE, text=True)
self.exit_with_success()
result = self.run_util(cli_command, *list_to_request, ignore_errors=False)
print(result)
def cpanel_api(self):
owner = self.request_data.get('owner')
method = self.request_data.pop('method')
list_to_request = self.prepair_params_for_command(with_json=False, add_dash=False)
cpanel_api = get_cpanel_api_class(owner)
self.exit_with_success({'data': cpanel_api.run(method, list_to_request)})
def cl_chart(self):
list_to_request = self.prepair_params_for_command()
try:
list_to_request.remove("--json")
except ValueError:
pass
for param in list_to_request:
if param.startswith('--output'):
self.exit_with_error('BAD REQUEST 2')
list_to_request.insert(0, '/usr/sbin/lvechart')
response = subprocess.check_output(list_to_request, shell=False, text=True)
print(json.dumps({"result": "success", "chart": response}))
silence_stdout_until_process_exit()
sys.exit(0)
def drop_permission(self):
"""
Drop permission to users, if owner of script is user
:return:
"""
data = self.request_data
if data['owner'] in ['reseller', 'user'] and\
('lve-id' not in self.user_info or
'username' not in self.user_info):
self.exit_with_error("User id does not specified")
def prepair_params_for_command(self, with_json=True, escaped_strings=False, add_dash=True):
"""
Method that converts given dict of parameters
into list of strings that should be passed
as arguments command-line application
:param with_json: add --json argument
:param escaped_strings: ONLY FOR BACKWARDS COMPATIBILITY!
SHOULD BE False FOR ALL NEW METHODS!
:param add_dash: if we need to add dashes to params
:return:
"""
value_template = "--{0}={1}" if add_dash else "{0}={1}"
data = copy.deepcopy(self.request_data)
list_to_request = []
if "method" in data:
for method in data["method"].split(' '):
list_to_request.append(method)
if "params" not in data:
data['params'] = {}
if "json" not in data['params'] and with_json:
data['params']['json'] = ''
for param, value in iteritems(data['params']):
if param != 'additional-params':
# TODO: looks like we can remove option escaped_strings
# and always use value.encode('utf-8') here
# same goal may be reached using utils.byteify(json.loads(...))
# but to do that, we need some tests covering unicode params
# (especially for cloudlinux-packages)
# unfortunately, we do not have one ;(
# THIS IS NEEDED ONLY FOR CL-PACKAGES UTILITY
if value and escaped_strings is True:
list_to_request.append(value_template.format(param, value.encode('unicode-escape').decode()))
elif (value or param in self.NOT_FLAGGED_PARAMS) and escaped_strings is False:
list_to_request.append(value_template.format(param, value))
else:
list_to_request.append("--{0}".format(param))
if self.request_data['owner'] == 'reseller':
list_to_request.append('--for-reseller={0}'.format(self.user_info['username']))
if 'additional-params' in data['params'] \
and data['params']['additional-params'] != '':
list_to_request.append("--")
for param in data['params']['additional-params'].split():
list_to_request.append("{0}".format(param))
return list_to_request
def is_edition_migration_available(self):
# check if edition migration is supported
return os.path.isfile('/usr/sbin/clncheck')
def update_license(self):
# Register by broken license
with open(os.devnull, 'w') as devnull:
clnreg_cmd = ['/usr/sbin/clnreg_ks', '--force']
if self.is_edition_migration_available():
clnreg_cmd.append('--migrate-silently')
subprocess.call(clnreg_cmd, stderr=devnull, stdout=devnull, shell=False)
subprocess.call(['/usr/bin/cldetect', '--update-license'], stderr=devnull, stdout=devnull, shell=False)
self.check_license(False)
def check_license(self, with_recovery=True):
if not self.kernel_is_supported():
if self.request_data['owner'] in ['reseller']:
self.exit_with_error(
code=503,
error_id='ERROR.not_available_plugin',
context={'pluginName': LVEMANAGER_PLUGIN_NAMES.get(self.current_plugin_name, DEFAULT_PLUGIN_NAME)},
icon='disabled')
elif self.request_data['owner'] in ['admin']:
self.exit_with_error('Kernel is not supported')
if is_hitting_max_accounts_limit():
if self.request_data['owner'] == 'admin':
self.exit_with_error('ERROR.hitting_max_accounts_limit')
if self.request_data['owner'] == 'user':
self.exit_with_error(
code=503,
error_id='ERROR.not_available_plugin',
context={'pluginName': LVEMANAGER_PLUGIN_NAMES.get(self.current_plugin_name, DEFAULT_PLUGIN_NAME)},
icon='disabled')
if not self.licence.get_license_status():
if self.request_data['owner'] in ['reseller', 'user']:
interpreter = 'nodejs'
if self.request_data.get('params') \
and self.request_data['params'].get('interpreter'):
interpreter = self.request_data['params']['interpreter']
pluginNames = {
'reseller': 'CloudLinux Manager',
'user': {'python': 'Python Selector', 'nodejs':'Node.js Selector'}
.get(interpreter, 'Node.js Selector')
}
self.exit_with_error(
code=503,
error_id='ERROR.not_available_plugin',
context={'pluginName': LVEMANAGER_PLUGIN_NAMES.get(self.current_plugin_name, DEFAULT_PLUGIN_NAME)},
icon='disabled')
else:
if with_recovery:
self.update_license()
else:
self.exit_with_error('License is not valid')
else:
self.license_is_checked = True
def exit_with_error(self, error_string='', context=None,
code=None, error_id=None, icon=None, ignore_errors=False):
result = {"result": error_string}
if context:
result['context'] = context
if code:
result['code'] = code
if error_id:
result['error_id'] = error_id
if icon:
result['icon'] = icon
if ignore_errors:
result['ignore'] = ignore_errors
print(json.dumps(result))
sys.exit(1)
def exit_with_success(self, response=None):
data = copy.deepcopy(response) if response else {}
data['result'] = 'success'
print(json.dumps(data))
sys.exit(0)
def cl_statistics(self):
# This imports from other package (cagefs), so we turn off pylint import checker for this line
from lvestats.lib.cloudlinux_statistics import main #pylint: disable=E0401
import lvestats.lib.config as config #pylint: disable=E0401
from lvestats.lib.dbengine import make_db_engine #pylint: disable=E0401
list_to_request = self.prepair_params_for_command()
try:
cnf = config.read_config()
dbengine = make_db_engine(cnf)
main(dbengine, argv_=list_to_request,server_id=cnf.get('server_id', 'localhost'))
silence_stdout_until_process_exit()
sys.exit(0)
except config.ConfigError as ce:
ce.log_and_exit()
self.exit_with_error(ce)
def spa_mock(self, file):
file_path = '/usr/share/l.v.e-manager/spa/src/jsons/%s.json' % (file)
# check if passed file param doesn't use relative path. E.g.: '../../file'
if os.path.realpath(file_path) != file_path:
self.exit_with_error('BAD REQUEST 3')
with open(file_path, 'r') as f:
print(f.read())
sys.exit(0)
def get_lve_version(self):
try:
ver = subprocess.check_output(
'cat /proc/lve/list | grep -Po \'^\d{1,2}:\'',
shell=True, executable='/bin/bash', text=True
).strip()
return int(ver[:-1])
except:
return 0
def get_cloudlinux_version(self):
return subprocess.check_output(
'uname -r | grep -Po \'el\d\w?\'',
shell=True, executable='/bin/bash', text=True
).strip()
# Common methods
def spa_ping(self):
self.exit_with_success()
def cl_selector(self):
try:
from clselector.cl_selector import CloudlinuxSelector
except:
self.exit_with_error('Module unavailable')
if self.user_info.get('username') and 'interpreter' in self.request_data['params']\
and self.request_data['params']['interpreter'] == 'php':
self.check_php_selector_user_availablility()
list_to_request = self.prepair_params_for_command()
cll = CloudlinuxSelector()
cll.run(list_to_request)
def check_php_selector_user_availablility(self):
"""
Additional check only for php selector
:return:
"""
try:
LIBDIR = '/usr/share/cagefs'
sys.path.append(LIBDIR)
import cagefsctl
if not cagefsctl.cagefs_is_enabled or \
not cagefsctl.is_user_enabled(self.user_info['username']):
raise RuntimeError('Cagefs is disabled or missing')
except (ImportError, RuntimeError):
self.exit_with_error(
code=503,
error_id='ERROR.cagefsDisabled',
)
from clselect.clselectexcept import BaseClSelectException
try:
from clselect import ClSelect
ClSelect.check_multiphp_system_default_version()
except (BaseClSelectException):
self.exit_with_error(
code=503,
error_id='ERROR.systemVersionAltPHP',
)
def define_current_plugin(self):
self.current_plugin_name = self.request_data.get('plugin_name')
def is_error_response_default(self, json_result):
return json_result.get('result') != 'success' and json_result.get('success') != 1
def run_util(self, name, *args, **kwargs):
command = [name] + list(args)
error_checker = kwargs.get('error_checker', self.is_error_response_default)
try:
p = subprocess.Popen(
command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
env=self.get_env())
(result, err) = p.communicate(kwargs.pop('stdin', None))
is_error = p.returncode != 0 or not is_json(result)
if not is_error:
json_result = json.loads(result)
is_error = error_checker(json_result)
if is_error:
result = result + err
if is_json(result):
json_result = json.loads(result)
if json_result.get('message'):
json_result['result'] = json_result.pop('message')
result = json.dumps(json_result)
if kwargs.get("ignore_errors", False):
# Check new result concatenated with error
if is_json(result):
result = json.loads(result)
result['ignore'] = True
result = json.dumps(result)
else:
result = self.ignored_error_message(result)
print(result)
exit(1)
return result
except Exception as e:
self.exit_with_error("Can't run %(command)s", context={'command': ' '.join(command)}, ignore_errors=True)
def ignored_error_message(self, message):
return json.dumps({
"result": message,
"ignore": True
})
def kernel_is_supported(self):
try:
if is_container():
return True
if is_cl_solo_edition(skip_jwt_check=True):
# CL9 uses Alma kernel which doesn't have 'lve' in its name
if get_cl_version() == 'cl9':
return True
uname = subprocess.check_output('uname -r', shell=True, executable='/bin/bash', text=True)
return 'lve' in uname
else:
f = open('/proc/lve/list', 'r')
line = f.readline()
f.close()
return bool(line)
except IOError:
return False
def cl_statsnotifier(self):
from lvestats.lib.cloudlinux_statsnotifier import main #pylint: disable=E0401
list_to_request = self.prepair_params_for_command(with_json=True)
exit_code = main(args_=list_to_request) #pylint: disable=E0401
sys.exit(exit_code)
def cloudlinux_awp_user(self):
cli_command = '/usr/bin/cloudlinux-awp-user'
list_to_request = self.prepair_params_for_command(with_json=False)
result = self.run_util(cli_command, *list_to_request, ignore_errors=False)
print(result)
def cl_install_plugin(self):
"""
This method is needed just for dev server to allow work with mocks
"""
self.exit_with_success()
# user cli parts
class CommandType(Enum):
HEAVY = 'heavy'
SIMPLE = 'simple'
class ConfigLimitValue(Enum):
ALL = 'all' # limit all requests
HEAVY = 'heavy' # don't limit white-listed 'simple' requests
UNLIMITED = 'unlimited' # don't limit at all
@classmethod
def _missing_(cls, value):
return cls.ALL
@dataclass
class Rule:
callable: Callable
result: CommandType
class LimitStrategyBase:
"""
Base limits strategy to decide - run incoming request with or without cagefs limits
Strategy execution means that used script (cloudlinux_cli_user.py) will be re-executed with (or not)
additional cagefs flags
"""
cagefs_args: List[str]
def execute(self, command: str, args: List[str], request_data: dict) -> Optional[int]:
full_command = self.get_full_command(command, args, request_data)
p = subprocess.Popen(full_command)
p.communicate()
return p.returncode
def get_full_command(self, command: str, args: List[str], request_data: dict) -> List[str]:
cmd = [*sys.argv, f'--skip-cagefs-check']
return [CAGEFS_ENTER_PROXIED_BIN, *self.cagefs_args, *cmd]
class NoCagefsStrategy(LimitStrategyBase):
"""
Strategy for hardcoded commands, that should always run even without the cagefs with unknown reason
This strategy does not re-executes the script with `cagefs_enter.proxied`, just letting them to finish as is
TODO: LVEMAN-1767
"""
def execute(self, *args, **kwargs) -> Optional[int]:
return None
class AllLimitStrategy(LimitStrategyBase):
"""
Strategy to limit all commands
"""
cagefs_args = []
class NoLimitStrategy(LimitStrategyBase):
"""
Strategy to don't limit all commands
"""
cagefs_args = ['--no-io-and-memory-limit', '--no-cpu-limit', '--no-max-enter']
class LimitStrategyHeavy(LimitStrategyBase):
"""
Strategy to don't limit whitelisted commands
By default - all commands are HEAVY and will be limited
Add `rules` to mark any command as SIMPLE and run without limits
"""
cagefs_args = []
default_rule = Rule(callable=lambda args: True, result=CommandType.HEAVY)
rules = {
'cloudlinux-selector': [
Rule(callable=lambda args: 'get' in args, result=CommandType.SIMPLE),
Rule(callable=lambda args: 'start' in args, result=CommandType.SIMPLE),
Rule(callable=lambda args: 'restart' in args, result=CommandType.SIMPLE),
Rule(callable=lambda args: 'stop' in args, result=CommandType.SIMPLE),
]
}
def _check_rules(self, command: str, args: List[str]) -> CommandType:
command_type = None
for rule in self.rules.get(command, []) + [self.default_rule]:
if rule.callable(args):
command_type = rule.result
break
if command_type == CommandType.SIMPLE:
self.cagefs_args = ['--no-io-and-memory-limit', '--no-cpu-limit', '--no-max-enter']
else:
self.cagefs_args = []
return command_type
def get_full_command(self, command: str, args: List[str], request_data: dict) -> List[str]:
self._check_rules(command, args)
return super().get_full_command(command, args, request_data)