# coding:utf-8
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import json
import logging
import subprocess
import os
import sys
from libcloudlinux import (
CloudlinuxCliBase, LVEMANAGER_PLUGIN_NAMES, DEFAULT_PLUGIN_NAME, PASSENGER_DEPEND_PLUGINS,
AllLimitStrategy, NoLimitStrategy, LimitStrategyHeavy, NoCagefsStrategy, LimitStrategyBase, ConfigLimitValue,
)
from clselector.clpassenger_detectlib import is_clpassenger_active
from clcommon import ClPwd
from clcommon.utils import is_litespeed_running
from clcommon.lib.cledition import is_cl_solo_edition
from cldetectlib import get_param_from_file
from clcommon.const import Feature
from clcommon.cpapi import is_panel_feature_supported
CONFIG = '/etc/sysconfig/cloudlinux'
SMART_ADVICE_USER_CLI = '/opt/alt/php-xray/cl-smart-advice-user'
PERCENTS_STATS_MODE_FLAG = '/opt/cloudlinux/flags/enabled-flags.d/percentage-user-stats-mode.flag'
# NB: this logger's out is stderr, result JSON out is stdout - so with active logger web will not work properly
# because of stderr redirection 2>&1
# so it is MUST be silent(NOTSET) in normal situation
# also it is not possible to use file logger here - script works inside the cagefs with user's rights
logger = logging.getLogger(__name__)
logger.setLevel(logging.NOTSET)
init_formatter = logging.Formatter('[%(asctime)s] %(funcName)s:%(lineno)s - %(message)s')
cagefs_formatter = logging.Formatter('{cagefs} [%(asctime)s] %(funcName)s:%(lineno)s - %(message)s')
h = logging.StreamHandler()
h.setFormatter(init_formatter)
logger.addHandler(h)
logger.debug('cli start')
class CloudlinuxCliUser(CloudlinuxCliBase):
limit_strategy: LimitStrategyBase
def __init__(self):
self.web_resource_limit_mode = ConfigLimitValue.ALL
limit_mode = get_param_from_file(CONFIG, 'web_resource_limit_mode', '=', ConfigLimitValue.ALL.value)
self.web_resource_limit_mode = ConfigLimitValue(limit_mode)
super(CloudlinuxCliUser, self).__init__()
self.command_methods.update({
'spa-get-domains': self.spa_user_domains,
'spa-get-homedir': self.spa_user_homedir,
'cloudlinux-snapshots': self.cl_snapshots,
'spa-get-user-info': self.spa_get_user_info
})
def __init_limit_strategy(self):
"""
Set default strategy from the `CONFIG` values
"""
if self.skip_cagefs_check:
logger.handlers[0].setFormatter(cagefs_formatter) # update log format to easier log review
# we cannot use cagefs when it is not available
if not is_panel_feature_supported(Feature.CAGEFS):
self.limit_strategy = NoCagefsStrategy()
else:
self.limit_strategy = {
ConfigLimitValue.ALL: AllLimitStrategy,
ConfigLimitValue.HEAVY: LimitStrategyHeavy,
ConfigLimitValue.UNLIMITED: NoLimitStrategy,
}.get(self.web_resource_limit_mode, AllLimitStrategy)()
logger.debug(
f'Limits strategy inited as {self.limit_strategy.__class__}'
f'\n\tBecause of:'
f'\n\tself.web_resource_limit_mode: {self.web_resource_limit_mode}'
)
def set_limit_strategy(self, strategy: LimitStrategyBase):
logger.debug(f'Limit strategy is explicitly set to {strategy.__class__}')
self.limit_strategy = strategy
def __check_exclusive_commands(self):
"""
Check is command currently run without cagefs; commands list is taken from Spa.php `processRequest()`
This function should be removed in the same task as the `LimitStrategyNoCagefs`
"""
data = self.request_data
if data.get('params', {}).get('interpreter') == 'php' or data.get('command') in {
'cloudlinux-statistics',
'cloudlinux-quota',
'cloudlinux-top',
'cloudlinux-snapshots',
'cloudlinux-charts',
'cloudlinux-statsnotifier',
'spa-get-user-info',
'cloudlinux-awp-user',
'cloudlinux-xray-user-manager',
'spa-get-domains',
'cpanel-api',
'cl-smart-advice-user',
}:
logger.debug('Executable command found in the exclusive list')
self.set_limit_strategy(NoCagefsStrategy())
def drop_permission(self):
"""
Drop permission to users, if owner of script is user
:return:
"""
logger.debug(
'drop permissions start'
f'\n\targv is: {sys.argv}'
f'\n\trequest data is: {self.request_data}'
)
self.__init_limit_strategy()
data = self.request_data
if data['owner'] != 'user':
self.exit_with_error("User not allowed")
super(CloudlinuxCliUser, self).drop_permission()
args = self.prepair_params_for_command()
logger.debug(f'prepared args is: {args}')
if data.get('command'):
if self.skip_cagefs_check:
logger.debug('cagefs skipped: --skip-cagefs-check arg found')
else:
self.__check_exclusive_commands()
# if rc is None - script won't enter the cagefs
# otherwise - command is executed in the cagefs
rc = self.limit_strategy.execute(data['command'], args, self.request_data)
if rc is not None:
logger.debug(f'command executed inside of the cagefs with rc: {rc}')
sys.exit(rc)
else:
logger.debug(f'cagefs skipped: strategy is {self.limit_strategy.__class__}')
# skip checking plugin availability on spa-get-user-info
if data.get('command') != 'spa-get-user-info':
self.check_plugin_availability()
logger.debug('drop permissons end')
def spa_user_domains(self):
print(json.dumps({"result": "success", "list": self.get_user_domains()}))
sys.exit(0)
def spa_user_homedir(self):
print(json.dumps({"result": "success", "homedir": self.get_user_homedir()}))
sys.exit(0)
def spa_get_user_info(self):
try:
print(json.dumps(
{
"result": "success",
"domains": self.get_user_domains(),
"homedir": self.get_user_homedir(),
"is_litespeed_running": is_litespeed_running(),
"is_cl_solo_edition": is_cl_solo_edition(skip_jwt_check=True),
"smart_advice": os.path.isfile(SMART_ADVICE_USER_CLI),
"is_lve_supported": is_panel_feature_supported(Feature.LVE),
"user_stats_mode": self.get_stats_mode(),
"server_ip": self.get_server_ip()
}))
except:
self.exit_with_error('Module unavailable')
sys.exit(0)
def get_user_domains(self):
try:
from clcommon.cpapi import userdomains
except:
self.exit_with_error('Module unavailable')
return [x[0] for x in userdomains(self.user_info['username'])]
def get_stats_mode(self):
if os.path.isfile(PERCENTS_STATS_MODE_FLAG):
return 'percent'
return 'default'
def get_user_homedir(self):
try:
pwdir = ClPwd().get_homedir(self.user_info['username'])
return pwdir + "/"
except KeyError:
self.exit_with_error('No such user')
def cl_snapshots(self):
list_to_request = self.prepair_params_for_command()
try:
output = self.run_util('/usr/sbin/lve-read-snapshot', *list_to_request)
except subprocess.CalledProcessError as processError:
output = processError.output
try:
result = json.loads(output)
except:
self.exit_with_error(output)
return
self.exit_with_success({'data': result['data']})
sys.exit(0)
def check_plugin_availability(self):
plugin_names = {
'nodejs_selector': 'Node.js Selector',
'python_selector': 'Python Selector',
}
selector_enabled = True
manager = None
try:
if self.current_plugin_name == 'nodejs_selector':
from clselect.clselectnodejs.node_manager import NodeManager
manager = NodeManager()
if self.current_plugin_name == 'python_selector':
from clselect.clselectpython.python_manager import PythonManager
manager = PythonManager()
if manager:
selector_enabled = manager.selector_enabled
except:
selector_enabled = False
if not selector_enabled:
self.exit_with_error(
code=503,
error_id='ERROR.not_available_plugin',
context={'pluginName': plugin_names.get(self.current_plugin_name, 'Plugin')},
icon='disabled')
plugin_available_checker = {
'nodejs_selector': self._plugin_available_nodejs,
'python_selector': self._plugin_available_python,
'php_selector': self._plugin_available_php,
'resource_usage': self._plugin_available_resource_usage,
}.get(self.current_plugin_name)
if plugin_available_checker:
plugin_available = plugin_available_checker()
else:
plugin_available = True
if not is_clpassenger_active() and self.current_plugin_name in PASSENGER_DEPEND_PLUGINS:
self.exit_with_error(
code=503,
error_id='ERROR.not_available_passenger',
context={'pluginName': LVEMANAGER_PLUGIN_NAMES.get(self.current_plugin_name, DEFAULT_PLUGIN_NAME)},
icon='disabled')
if not plugin_available:
self.exit_with_error(
code=503,
error_id='ERROR.not_available_plugin',
context={'pluginName': LVEMANAGER_PLUGIN_NAMES.get(self.current_plugin_name, DEFAULT_PLUGIN_NAME)},
icon='disabled')
def _plugin_available_nodejs(self):
try:
from clselect.clselectnodejs.node_manager import NodeManager
manager = NodeManager()
if not manager.selector_enabled or not is_clpassenger_active():
return False
except:
return False
return True
def _plugin_available_python(self):
try:
from clselect.clselectpython.python_manager import PythonManager
manager = PythonManager()
if not manager.selector_enabled or not is_clpassenger_active():
return False
except:
return False
return True
def _plugin_available_php(self):
try:
from clselect.clselectphp.php_manager import PhpManager
manager = PhpManager()
if not manager.selector_enabled:
return False
except:
return False
return True
def _plugin_available_resource_usage(self):
return True