# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import configparser import grp import json import os import pwd import random import re import subprocess import sys import uuid from collections import namedtuple from functools import wraps from pathlib import Path from typing import AnyStr, List, Optional, Tuple # NOQA import cldetectlib as detect from cl_proc_hidepid import get_hidepid_typing_from_mounts from clcommon.clpwd import ClPwd, drop_privileges from clcommon.cpapi import Feature, is_panel_feature_supported from clcommon.lib.cledition import CLEditionDetectionError, is_cl_solo_edition from clcommon.lib.cmt_utils import ( is_client_enabled, is_cmt_disabled, ) from clcommon.lib.consts import DEFAULT_JWT_ES_TOKEN_PATH, DISABLE_CMT_FILE from clcommon.lib.jwt_token import jwt_token_check from clcommon.lib.whmapi_lib import WhmApiError, WhmApiRequest from clcommon.utils import ( ExternalProgramFailed, demote, is_litespeed_running, is_ubuntu, process_is_running, run_command, service_is_enabled_and_present, ) from cllimits_validator import LimitsValidator from clsentry.utils import get_pkg_version # Possible result types (ChkStatus?) OK = "OK" # 'PASSED' is better? FAILED = "FAILED" SKIPPED = "SKIPPED" INTERNAL_TEST_ERROR = "INTERNAL_TEST_ERROR" cldiag_doc_link = "https://docs.cloudlinux.com/command-line_tools/" cron_cldiag_checkers_param_name = "disabled_cldiag_cron_checkers" cron_cldiag_section_name = "cldiag_cron" cl_plus_doc_link = "https://docs.cloudlinux.com/cloudlinux-os-plus/#faq-2" cl_plus_doc_msg = f"Link to FAQ and troubleshooting {cl_plus_doc_link}" write_to_support_msg = "Please write to support https://cloudlinux.zendesk.com/ if you can't resolve the issue." cm_full_name = "Centralized Monitoring" SKIPPED_ON_SOLO_MSG = "This checker is not supported on CloudLinux OS Solo edition" SKIPPED_WITHOUT_LVE_MSG = "This checker is not supported in environments without LVE support" ChkResult = namedtuple( "ChkResult", [ "res", # One of predefined checker result types "msg", # Resulting msg from this checker ], ) SUEXEC_PATH = { "cPanel": "/usr/local/apache/bin/suexec", "cPanel_ea4": "/usr/sbin/suexec", "DirectAdmin": "/usr/sbin/suexec", "Plesk": "/usr/sbin/suexec", "ISPManager": "/usr/sbin/suexec", "InterWorx": "/usr/sbin/suexec", "H-Sphere": "/usr/sbin/suexec", "HostingNG": "/usr/sbin/suexec", "Unknown": "/usr/sbin/suexec", } SUPHP_PATH = { "cPanel": "/opt/suphp/sbin/suphp", "cPanel_ea4": "/usr/sbin/suphp", "DirectAdmin": "/usr/local/suphp/sbin/suphp", "Plesk": "/usr/sbin/suphp", "ISPManager": "/usr/sbin/suphp", "InterWorx": "/usr/sbin/suphp", "H-Sphere": "/usr/sbin/suphp", "HostingNG": "/usr/sbin/suphp", "Unknown": "/usr/sbin/suphp", } BINARY_CHECK_PARAMETERS = {} BINARY_CHECK_PARAMETERS["suphp"] = { "name": "SuPHP", "status_function": "detect.get_suPHP_status()", "location": SUPHP_PATH, } BINARY_CHECK_PARAMETERS["suexec"] = { "name": "SuEXEC", "status_function": "detect.get_suEXEC_status()", "location": SUEXEC_PATH, } _CLDIAG_USERNAME_FILE = "/var/lve/cldiag_user" _CLDIAG_TEST_USENAME_PREFIX = "cldiaguser" def pretty_name(name_of_checker): def decorator(func): func.pretty_name = name_of_checker return func return decorator def _formatter(data, error_count, to_json=False): """ Formatter of output from all of checkers """ msg = "Command for disabling this cron checker: " cmd_tmp = "cldiag --disable-cron-checkers" if to_json: res = {checker_pretty_name: chk_result._asdict() for checker_pretty_name, _, chk_result in data} res["total_errors"] = error_count return json.dumps(res) res = [] for checker_pretty_name, checker_public_name, chk_result in data: checker_result = f"{checker_pretty_name}:\n {chk_result.res}: " f"{chk_result.msg}" if checker_public_name is not None: checker_result = f"{checker_result}\n" f'{msg} "{cmd_tmp} {checker_public_name}"' res.append(checker_result) res = "\n\n".join(res + [f"There are {error_count} errors found."]) return res def runner(checkers, to_json=False, do_exit=True): if callable(checkers): # allow single checker as input too checkers = [checkers] results = [] errors = 0 for f in checkers: try: chk_result = f() except Exception as e: chk_result = ChkResult(INTERNAL_TEST_ERROR, repr(e)) if chk_result.res in ( FAILED, INTERNAL_TEST_ERROR, ): errors += 1 results.append( ( f.pretty_name, f.public_name if hasattr(f, "public_name") else None, chk_result, ) ) res = _formatter(results, errors, to_json) if do_exit: print(res) sys.exit(errors) return errors, res def wrapper(func): try: return eval(func) except AttributeError: print(f"WARNING\n missing {func} function in cldetectlib.") return False def skip_checker_on_cl_solo(f): @wraps(f) def checker(*args, **kwargs): try: is_solo_edition = is_cl_solo_edition(skip_jwt_check=True) except CLEditionDetectionError: is_solo_edition = False if is_solo_edition: return ChkResult(SKIPPED, SKIPPED_ON_SOLO_MSG) return f(*args, **kwargs) return checker def skip_check_without_lve(f): @wraps(f) def checker(*args, **kwargs): if not is_panel_feature_supported(Feature.LVE): return ChkResult(SKIPPED, SKIPPED_WITHOUT_LVE_MSG) return f(*args, **kwargs) return checker @pretty_name("Check cagefs") def fake_cagefs_checker(): return ChkResult( SKIPPED, "Cagefs version is too old. " "Please run cagefsctl --sanity-check directly " "or upgrade it to have full cldiag integration", ) def _is_cmt_allowed_for_server() -> Tuple[bool, Optional[str]]: """ Check that a server is cl+, enabled and CM isn't disabled locally The function returns True if the client has CL+ license, didn't disable CM localy and activated CM on https://cm.cloudlinux.com. The function also returns True if we can't read or parse JWT token, because we want to continue and show to client CM related errors """ cm_is_not_activated_msg = f"{cm_full_name} is not activated" " on https://cm.cloudlinux.com" cm_is_disabled_localy_msg = f"The {cm_full_name} is disabled localy" f' by creating file "{DISABLE_CMT_FILE}"' no_cl_plus_license_msg = "The server has no CL+ license" from clsummary.utils import get_client_data_from_jwt_token # pylint: disable=import-outside-toplevel jwt_token = get_client_data_from_jwt_token() if jwt_token is not None and not jwt_token["cl_plus"]: # we do nothing if client doesn't have CL+ license return False, no_cl_plus_license_msg # we should check the state of JWT token if we didn't take data from it if jwt_token is None: is_valid, message, _ = jwt_token_check() if not is_valid: return is_valid, message if is_cmt_disabled(): # we do nothing if cmt is disabled locally return False, cm_is_disabled_localy_msg # we do nothing if client isn't enabled in CM if not is_client_enabled(): return False, cm_is_not_activated_msg # We want to continue checks in case of problems with jwt token # because we want to show cmt related errors to client. return True, None def skip_if_cmt_not_used_enabled_allowed(f): """ Decorator: Skip check if a server isn't cl+, disabled and CM is disabled locally """ @wraps(f) def decorated_function(*args, **kwargs): """ Decorated function """ result, message = _is_cmt_allowed_for_server() if result: return f(*args, **kwargs) return ChkResult( SKIPPED, message, ) return decorated_function @pretty_name("Check existing JWT token") def check_jwt_token(): """ Check an existing JWT token """ token_is_absent_msg = " The absence of JWT tokens is normal for the clients with volume license. " main_msg = ( "Please check for JWT token in path " f'"{DEFAULT_JWT_ES_TOKEN_PATH}". %sTry running "rhn_check"' " for getting a new token if it is absent. Server can't " f"collect and send statistics to {cm_full_name} if you " f"don't have a correct JWT token. {cl_plus_doc_msg}. " f"{write_to_support_msg}" ) token_is_not_cl_plus = "JWT token doesn't have CL+ service" from clsummary.utils import get_client_data_from_jwt_token # pylint: disable=import-outside-toplevel if not os.path.exists(DEFAULT_JWT_ES_TOKEN_PATH): return ChkResult( SKIPPED, main_msg % token_is_absent_msg, ) result, message, _ = jwt_token_check() if result: jwt_token = get_client_data_from_jwt_token() return ChkResult(OK, f'JWT token is valid: "{jwt_token}"') if message == token_is_not_cl_plus: return ChkResult( SKIPPED, "The server has no CL+ license", ) main_msg = main_msg % "" return ChkResult(FAILED, f"{message}. {main_msg}") def _check_service_state(service_name: str, process_file_path: str) -> ChkResult: """ Check that a service is present, enabled and active :param service_name: name of a service :param process_file_path: path to a file which is run by a service """ is_present, is_enabled = service_is_enabled_and_present(service_name) try: is_active = process_is_running(process_file_path, False) except FileNotFoundError: is_active = False if is_present and is_enabled and is_active: return ChkResult( OK, f'Service "{service_name}" is present, enabled and active', ) messages = [] if not is_present: messages.append("Service is not present.") if not is_enabled: messages.append("Service is not enabled.") if not is_active: messages.append("Service is not active.") return ChkResult( FAILED, f"{' '.join(messages)} The server can't collect and send " f"statistics to {cm_full_name} if service {service_name} isn't " f"present, enabled and active. {cl_plus_doc_msg}. " f"{write_to_support_msg}", ) @pretty_name("Check service `cl_plus_sender` is present, enabled and active") @skip_checker_on_cl_solo @skip_if_cmt_not_used_enabled_allowed def check_cl_plus_sender_service(): """ Check that service `cl_plus_sender` is present, enabled and active """ from clsummary.utils import CL_PLUS_SENDER_FILE_PATH # pylint: disable=import-outside-toplevel service_name = "cl_plus_sender" return _check_service_state(service_name, CL_PLUS_SENDER_FILE_PATH) @pretty_name("Check service `node_exporter` is present, enabled and active") @skip_checker_on_cl_solo @skip_if_cmt_not_used_enabled_allowed def check_node_exporter_service(): """ Check that service `node_exporter` or `cl_node_exporter` is present, enabled and active Since it was renamed node_exporter -> cl_node_exporter let`s handle both cases: - old `node_exporter` service - renamed `cl_node_exporter` service """ base_service_path = "/usr/share/cloudlinux/cl_plus/service/" process_file_path = "/usr/share/cloudlinux/cl_plus/node_exporter" # looking for cl_node_exporter on cl6, cl_node_exporter.service on cl7+ if os.path.exists(os.path.join(base_service_path, "cl_node_exporter")) or os.path.exists( os.path.join(base_service_path, "cl_node_exporter.service") ): service_name = "cl_node_exporter" else: service_name = "node_exporter" return _check_service_state(service_name, process_file_path) @pretty_name("Check service `lvestats` is present, enabled and active") @skip_checker_on_cl_solo @skip_if_cmt_not_used_enabled_allowed def check_lvestats_service(): """ Check that service `lvestats` is present, enabled and active """ service_name = "lvestats" process_file_path = "/usr/share/lve-stats/lvestats-server.py" return _check_service_state(service_name, process_file_path) @pretty_name("Check that the server has the minimal required packages for correct working of Centralized Monitoring") @skip_checker_on_cl_solo @skip_if_cmt_not_used_enabled_allowed def check_cmt_packages(): """ Check that the server has minimal required packages for CM """ for package_name in ["cl-end-server-tools", "cl-node-exporter"]: if get_pkg_version(package_name) is None: return ChkResult( FAILED, "System doesn't have the package " f'"{package_name}". It\'s required for {cm_full_name} ' "feature to work and it usually installed " f"automatically by cron. {cl_plus_doc_msg}. " f"{write_to_support_msg}", ) return ChkResult(OK, "System has the minimal required packages for correct working of Centralized Monitoring") @pretty_name("Check control panel and it's configuration (for DirectAdmin only)") def check_cp_diag(): fix_motivation = ( " Fixing the issue will provide CloudLinux support on your control panel. \n" f"See details: {cldiag_doc_link + '#diag-cp'}" ) detect.getCP() cp_name = detect.getCPName() if cp_name == "Unknown": return ChkResult(SKIPPED, "Can't detect contol panel") res_msg = f"Control Panel - {cp_name}; Version {detect.CP_VERSION};" # we are not setting cloudlinux yes on CL SOLO if not is_cl_solo_edition(skip_jwt_check=True) and cp_name == "DirectAdmin": if detect.da_check_options(): return ChkResult(OK, res_msg + ' File "options.conf" is fine') return ChkResult(FAILED, res_msg + ' File "options.conf" has no line "cloudlinux=yes"' + fix_motivation) return ChkResult(OK, res_msg) @pretty_name("Check fs.enforce_symlinksifowner is correctly enabled in sysctl conf") @skip_check_without_lve def check_symlinksifowner(): fix_motivation = ( " Fixing that issue makes server more secure against " "symlink attacks and enables protection of PHP configs " f"or other sensitive files. \nSee details: {cldiag_doc_link + '#symlinksifowner'}" ) if detect.is_openvz(): return ChkResult(SKIPPED, "Not supported for OpenVZ environment") try: symlinks_if_owner = detect.get_symlinksifowner() except ExternalProgramFailed as e: detailed_out = "To see full error run /sbin/sysctl --system" return ChkResult( FAILED, "Some parameter in sysctl config has wrong configuration. " f"Error: {get_short_error_message(str(e), detailed_out)} It`s recommended to fix it and try again ", ) if symlinks_if_owner == 2: return ChkResult(FAILED, "fs.enforce_symlinksifowner = 2" + fix_motivation) return ChkResult(OK, f"fs.enforce_symlinksifowner = {symlinks_if_owner}") def binary_check(params): module_name = params["name"].lower() link = cldiag_doc_link + "#check-" + module_name fix_motivation = ( " Fix that issue to be sure that users run their sites inside CageFS and provide stable " f"work of sites that are using apache {module_name} module. This may improve server security" f"\nSee details: {link}" ) if not os.path.exists("/usr/sbin/cagefsctl"): return ChkResult(SKIPPED, "Cagefs is not installed") if not wrapper(params["status_function"]): return ChkResult(SKIPPED, f"{params['name']} is not enabled") has_jail = detect.check_binary_has_jail(params["location"]) if has_jail is None: return ChkResult( SKIPPED, f"Unable to check {params['name']} module binary for " "custom control panel. This feature may be added in future updates.", ) if not has_jail: return ChkResult(FAILED, "Binary without CageFS jail " + fix_motivation) return ChkResult(OK, "binary has jail") @pretty_name("Check suexec has cagefs jail") def check_suexec(): # Check that LiteSpeed is installed and run if detect.detect_litespeed() and is_litespeed_running(): return ChkResult( SKIPPED, "Current PHP selector uses LiteSpeed, which doesn't require the patches in suEXEC bin." ) return binary_check(BINARY_CHECK_PARAMETERS["suexec"]) @pretty_name("Check suphp has cagefs jail") def check_suphp(): return binary_check(BINARY_CHECK_PARAMETERS["suphp"]) @pretty_name("Check usepam in sshd config") def check_use_pam(): fix_motivation = ( "Fix the issue to provide correct work of pam_lve module with sshd and " f"CageFS ssh sessions\nSee details: {cldiag_doc_link + '#check-usepam'}" ) check_result = detect.check_SSHd_UsePAM() if check_result is None: return ChkResult(SKIPPED, 'Unable to run "/usr/sbin/sshd -T"') if check_result: return ChkResult(OK, "Config is fine") return ChkResult(FAILED, 'There is "usepam no" in "/usr/sbin/sshd -T" output ' + fix_motivation) @pretty_name("Check the validity of LVE limits on server") @skip_check_without_lve def check_lve_limits(): # type: () -> ChkResult """ Validate lve limits """ doc_link = "https://docs.cloudlinux.com/lve-limits-validation.html" failed_message = "Invalid LVE limits on server. See doc: " + doc_link passed_message = "Valid LVE limits on server." limits_validator = LimitsValidator() result = limits_validator.validate_existing_limits() if result is None: return ChkResult(OK, passed_message) return ChkResult(FAILED, failed_message + "\n" + result) @pretty_name("Check compatibility for PHP Selector") def check_phpselector(): """ 1. mod_ruid not present 2. suphp 3. mod_lsapi 4. suexec and (fcgi or cgi) 5. litespeed 6. do not support other """ ok_prefix = "It looks ok [%s]" fail_prefix = ( "Looks like your PHP handler doesn't support CloudLinux PHP Selector " "and as a result does not work http://docs.cloudlinux.com/index.html?compatiblity_matrix.html [%s]" f"\nPlease, see: {cldiag_doc_link + '#check-phpselector'} and try to fix issue to have working selector" ) # LU-3041: no support for PHP-Selector on Ubuntu is_ubuntu_os = is_ubuntu() if is_ubuntu_os: return ChkResult(SKIPPED, "PHP Selector is not supported. Skipping check") # do not check for EA3 if not os.path.exists("/etc/cpanel/ea4/is_ea4"): return ChkResult(SKIPPED, "It is not cPanel with EA4, can diag nothing") # litespeed check if detect.detect_litespeed() and is_litespeed_running(): return ChkResult(OK, ok_prefix % "Litespeed") status = {"suexec": False, "suphp": False, "lsapi": False} handler = None # check /etc/cpanel/ea4/php.conf for EA4 conf_path = "/etc/cpanel/ea4/php.conf" if os.path.exists(conf_path): try: with open(conf_path, "r", encoding="utf-8") as f: config = [line.strip() for line in f] except IOError as e: err = f"Can not read {conf_path} ({e})" return ChkResult(FAILED, fail_prefix % err) # some stub version string for line in config: if line.startswith("default:"): default_ver = (line.split(":")[1]).strip() break else: err = f"{conf_path} config should have default php version" return ChkResult(FAILED, fail_prefix % err) for line in config: if line.startswith(f"{default_ver}:"): handler = (line.split(":")[1]).strip() if handler not in ["cgi", "fcgi", "suphp", "lsapi"]: err = f"doesn't support {handler} handler in ea4/php.conf" return ChkResult(FAILED, fail_prefix % err) modules = detect.get_apache_modules() if modules is not None: if "ruid2_module" in modules: return ChkResult( FAILED, fail_prefix % "It looks like you use mod_ruid. CloudLinux PHP Selector doesn't work properly with it. " "How to delete mod_ruid and install mod_suexec in cPanel " "https://docs.cloudlinux.com/cloudlinux_os_components/#installation-5", ) status["suphp"] = "suphp_module" in modules status["lsapi"] = "lsapi_module" in modules status["suexec"] = "suexec_module" in modules if not any([status["suphp"], status["suexec"]]): return ChkResult( FAILED, fail_prefix % "It looks like you do not have mod_suphp or mod_suexec installed. " "CloudLinux PHP Selector doesn't work properly without it", ) if status["suphp"] or status["suexec"] and handler in ["suphp", "cgi", "fcgi", "lsapi"]: current = ( f"php.conf:{handler} with {', '.join(module for module, is_installed in status.items() if is_installed)}" ) return ChkResult(OK, ok_prefix % current) err = ( "Some unknown php handler, perhaps we don't support it " f"[found handler: {'-' if handler is None else handler} " f"and apache modules: {', '.join(module for module, is_installed in status.items() if is_installed)}]" ) return ChkResult(FAILED, fail_prefix % err) @pretty_name("Check fs.symlinkown_gid") @skip_check_without_lve def check_symlinkowngid(): fix_motivation = ( "Fix the issue to provide symlink protection for apache user " "and as a result make your Web Server more secure. " f"\nSee details: {cldiag_doc_link + '#check-symlinkowngid'}" ) ok_res = ChkResult(OK, "Web-server user is protected by Symlink Owner Match Protection") warn_msg_tpl = "Web-server user '{}' is not in protected group " "specified in {}. " + fix_motivation symlinkown_gid_file = "/proc/sys/fs/symlinkown_gid" if detect.is_openvz(): return ChkResult(SKIPPED, "Not supported for OpenVZ environment") detect.get_apache_gid() # This function fills few module-level variables apache_uname = detect.APACHE_UNAME try: pwd.getpwnam(apache_uname) except KeyError: return ChkResult( SKIPPED, f"There is no web-server user [{apache_uname}] in system. " "Nothing to check" ) try: # current_symlinkown_gid = int(open(symlinkown_gid_file).read().strip()) with open(symlinkown_gid_file, encoding="utf-8") as f: current_symlinkown_gid = int(f.read().strip()) except Exception as e: return ChkResult(FAILED, f"Can't read GID from {symlinkown_gid_file} with error: {e}") if detect.APACHE_GID == current_symlinkown_gid: return ok_res try: grp_members = grp.getgrgid(current_symlinkown_gid).gr_mem except KeyError: # no such group grp_members = [] if grp_members: # Most often both LiteSpeed and Apache runs under the same user if apache_uname in grp_members: return ok_res return ChkResult(FAILED, warn_msg_tpl.format(apache_uname, symlinkown_gid_file)) @pretty_name("Check existence of all user's packages") @skip_checker_on_cl_solo def check_existence_of_all_users_packages(): """ Return user's packages that do not exist in /var/cpanel/packages/ """ packages_dir_path = "/var/cpanel/packages/" users_dir_path = "/var/cpanel/users/" suspended_dir_path = "/var/cpanel/suspended/" excluded_packages_names = ["undefined", "default", "cPanel Ticket System temporary user", "Custom"] user_plan_cmd = ["/bin/grep", "-e", "PLAN=", "-r"] suspended_users = [] if detect.getCPName() != "cPanel": return ChkResult(SKIPPED, "should be run on cPanel only") if not os.listdir(users_dir_path): return ChkResult(SKIPPED, "no users on this server") if os.path.exists(suspended_dir_path): suspended_users = os.listdir(suspended_dir_path) # getting users packages with subprocess.Popen( user_plan_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=users_dir_path, text=True, ) as proc: std_out, std_err = proc.communicate() ret_code = proc.returncode if ret_code != 0: msg = f"error getting user's packages: {std_err}" return ChkResult(FAILED, msg) try: # std_out sample: <username1>:PLAN=<package_name>\n <username2>:PLAN=<package_name>\n... all_users_packages = [ (plan.split("=")[0].split(":")[0], plan.split("=")[1].strip()) for plan in std_out.strip().split("\n") ] # filter suspended users all_users_packages = [(user, pkg) for user, pkg in all_users_packages if user not in suspended_users] except Exception as e: msg = f"error processing user's packages: {e}" return ChkResult(FAILED, msg) # getting exists packages exists_packages = [ package for package in os.listdir(packages_dir_path) if os.path.isfile(os.path.join(packages_dir_path, package)) ] # getting not exists users packages not_exists_users_packages = [ f"{user}: {package}" for user, package in all_users_packages if package not in excluded_packages_names and package not in exists_packages ] if not_exists_users_packages: msg = ( "Found some nonexistent user's packages. " f'List of "user: package" separated by semicolon: {"; ".join(not_exists_users_packages)}. ' "If you want to apply package limits for those users - assign existing packages to them, " "otherwise limits will be applied incorrectly or not applied at all." ) return ChkResult(FAILED, msg) return ChkResult(OK, "nonexistent user's packages aren't found") @pretty_name("Check all resellers's packages files") @skip_checker_on_cl_solo def check_da_resellers_packages_files(): """ Check reseller packages files reading on any errors Caused by LU-2374 """ if detect.getCPName() != "DirectAdmin": return ChkResult(SKIPPED, "should be run on DirectAdmin only") class HiddenPrints: """ Redirect stdout to /dev/null to hide output """ def __enter__(self): self._original_stdout = sys.stdout # pylint: disable=attribute-defined-outside-init sys.stdout = open(os.devnull, "w", encoding="utf-8") def __exit__(self, exc_type, exc_val, exc_tb): sys.stdout.close() sys.stdout = self._original_stdout from clcontrollib import DirectAdmin # pylint: disable=import-outside-toplevel try: with HiddenPrints(): DirectAdmin().list_resellers_packages() return ChkResult(OK, "all resellers packages are written in correct encoding") except Exception as e: return ChkResult(FAILED, str(e)) DEFAULTS_CFG_PATH = "/etc/cl.selector/defaults.cfg" PHP_CONF_PATH = "/etc/cl.selector/php.conf" PARAM_NAME_LIST = ["Directive", "Default", "Type", "Comment", "Range", "Remark"] TYPES = ["value", "list", "bool"] def parse_php_conf(): """ Parse php.conf and split it into blocks by empty line :return: """ line_blocks = [] block_index = 0 # we can`t just ignore empty lines # new_block flag helps to define when empty line is delimiter of block # and when it is just unnecessary line # 1st non-empty and non-comment line is new block by default new_block = True with open(PHP_CONF_PATH, "r", encoding="utf-8") as conf: data = conf.readlines() for line in data: if line.startswith("#"): continue if len(line.strip()) > 0: # found 1st non-empty line # consider next lines as part of block, until empty line found new_block = False try: line_blocks[block_index] except Exception: line_blocks.append([]) line_blocks[block_index].append(line.strip()) elif not new_block: # empty line found # ignore more than one empty line new_block = True block_index += 1 return line_blocks def check_block(block): result = True msg = "" for line in block: line_parts = line.split("=") if line_parts[0].strip() not in PARAM_NAME_LIST: result = False msg = msg + f"\nBlock {block_to_string(block)} has wrong param \n" if line_parts[0].strip() == "Type": if line_parts[1].strip() not in TYPES: result = False msg = msg + f"\nBlock {block_to_string(block)} has wrong directive \n" return [result, msg] def block_to_string(block): res_string = "\n" for line in block: res_string = res_string + str(line) + "\n" return res_string @pretty_name("Checking /etc/cl.selector/php.conf") def check_php_conf(): php_ini_doc_link = "https://docs.cloudlinux.com/custom_php_ini_options.html" fix_motivation = ( "To fix the issue provide valid format for /etc/cl.selector/php.conf file. " "It is used for PHP Selector and invalid format lead to directives misconfiguration " "and as a result misconfiguration of selector" f"\nPlease, read more about php.conf file in {php_ini_doc_link}" ) result = True msg = "" if not os.path.exists(PHP_CONF_PATH): return ChkResult(SKIPPED, f"File {PHP_CONF_PATH} does not exist\n") blocks = parse_php_conf() for block in blocks: r1, msg1 = check_block(block) result = result and r1 if msg1: msg = msg + "\n" + msg1 if not result: return ChkResult(FAILED, msg + fix_motivation) return ChkResult(OK, "Ok") @pretty_name("Checking /etc/cl.selector/defaults.cfg") def check_defaults_cfg(): fix_motivation = ( "Details: this config file is used by php selector and stores it`s global options, " "so it is important to keep needed configurations and valid syntax for PHP modules " "settings to avoid selector`s misconfiguration" f"\nSee details: {cldiag_doc_link + '#cldiag'}" ) if not os.path.exists(DEFAULTS_CFG_PATH): return ChkResult(SKIPPED, f"{DEFAULTS_CFG_PATH} does not exist") try: defaults_cfg = configparser.ConfigParser(interpolation=None, strict=False) defaults_cfg.read(DEFAULTS_CFG_PATH) except Exception as e: return ChkResult(FAILED, str(e)) try: default_php_version = defaults_cfg.get("versions", "php") except (configparser.NoOptionError, configparser.NoSectionError): return ChkResult(FAILED, "Default php version is undefined\n" + fix_motivation) for section in defaults_cfg.sections(): if section.startswith("php"): php_version = section[3:] try: state = defaults_cfg.get(section, "state") except configparser.NoOptionError: state = "enable" try: modules = defaults_cfg.get(section, "modules") except configparser.NoOptionError: modules = "" if default_php_version == php_version and state == "disabled": return ChkResult(FAILED, f"Default php version {php_version} is disabled\n{fix_motivation}") if modules: if "," in modules: module_names = modules.split(",") for name in module_names: if not name: sys.stderr.write(f"Warning: Modules list for version {php_version} is strange\n") return ChkResult(OK, "OK") @pretty_name("Checking domains compatibility") def check_domains_compatibility(): if detect.getCPName() != "cPanel": return ChkResult(SKIPPED, "should be run on cPanel only") failed_message = ( "Some domains/subdomains don't use PHP Selector because they have a non-system default " "version (in MultiPHP Manager) or PHP_FPM enabled. You can find their list on domains tab " "and pass control to PHP Selector if necessary." ) passed_message = "Ok" result = domains_compatibility_checker() if result is None: return ChkResult(OK, passed_message) return ChkResult(FAILED, failed_message) def domains_compatibility_checker(): try: domains = WhmApiRequest("php_get_vhost_versions").call() system_version = WhmApiRequest("php_get_system_default_version").call() except WhmApiError: return None for domain in domains.get("versions"): if system_version.get("version") != domain.get("version") or domain.get("php_fpm"): return "Incompatible version" def get_dir_mountpoint(dirpath: str) -> Optional[str]: """ Get mountpoint for dirpath directory from output of df -h {dirpath} utility. """ if not os.path.isdir(dirpath): return None get_mountpoint_cmd = f"df -h {dirpath}" process = subprocess.run(get_mountpoint_cmd.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False) # example stdout: # [root@localhost ~] # df -h /var/cagefs # Filesystem Size Used Avail Use% Mounted on # /dev/vda1 10G 3.1G 7.0G 31% / if process.returncode != 0: return None try: mounted_on = process.stdout.split("\n")[1].split(" ")[-1] except IndexError: return None return mounted_on def get_max_uid() -> int: """ Returns maximum uid from /etc/login.defs If file does not exist returns 60000 """ max_uid = 60000 if os.path.isfile("/etc/login.defs"): with open("/etc/login.defs", encoding="utf-8") as f: for line in f: if line.startswith("UID_MAX "): max_uid = int(line.split(" ")[-1]) return max_uid def get_min_uid() -> int: """ Returns min cagefs uid """ get_min_uid_cmd = "/usr/sbin/cagefsctl --get-min-uid" stdout = run_command(get_min_uid_cmd.split(" "), convert_to_str=True) min_uid = int(stdout) return min_uid def useradd(username: str) -> int: """ Creates user with max available uid that greater than min cagefs uid and less than max system uid. Does nothing if user already exists. """ min_uid = get_min_uid() max_uid = get_max_uid() if max_uid < min_uid: raise RuntimeError(f"Can't create {username} user: " f"min_uid {min_uid} is greater than max_uid {max_uid}") clpwd = ClPwd(min_uid=min_uid) if username in clpwd.get_user_full_dict(): return clpwd.get_uid(username) if not clpwd.get_uid_dict(): custom_uid = min_uid else: custom_uid = max_uid used_uids_dict = clpwd.get_uid_dict() for _uid in range(min_uid, max_uid): if _uid not in used_uids_dict: custom_uid = _uid break if custom_uid >= max_uid: raise RuntimeError(f"Can't create {username} user: uid {custom_uid} is too big") useradd_cmd = f"/usr/sbin/useradd -s /bin/false -u {custom_uid} -m {username}" returncode, _, err = run_command(useradd_cmd.split(" "), return_full_output=True) if returncode != 0: raise RuntimeError(err) return custom_uid def get_username_from_file() -> Optional[str]: """ Retrive cldiag username from file :return: username from file or None """ try: with open(_CLDIAG_USERNAME_FILE, encoding="utf-8") as f: return f.read().strip() except (OSError, IOError): # No file pass return None def remove_all_trash_cldiag_users(): """ Remove all trash cldiag users from system """ cl_pwd = ClPwd() re_pattern = re.compile("^cldiaguser_[a-f0-9]{21}$") users_dict = cl_pwd.get_user_full_dict() for username, _ in users_dict.items(): # skip non-test users if not re_pattern.match(username): continue try: userdel_cmd = f"/usr/sbin/userdel -r {username}" run_command(userdel_cmd.split(" ")) except (OSError, IOError, ExternalProgramFailed): pass def is_quota_active(): """ Detect quota is activated :return: True/False - quotas activated/not activated """ _REPQUOTA_PATH = "/usr/sbin/repquota" cmd = [_REPQUOTA_PATH, "-nva"] try: stdout = run_command(cmd) except ExternalProgramFailed: return False # quotas not supported if repqouta returns nothing if not stdout: return False return True @pretty_name("Checking if /var/cagefs is located on partition with disk quota enabled") @skip_checker_on_cl_solo # TODO: Remove this skip if cl-quota will be enabled on CL Solo def check_cagefs_partition_disk_quota(): """ Checker for check if /var/cagefs is located on partition with disk quota enabled. Algorithm for check: we trying to set cldiaguser's quota to 1 inode (so that this user can't create any file if the quota activated on this partition). Then we change uid of process to cldiaguser's uid, and try to create file with his permissions. If we can't create file (Disk quota exceeded) then it's alright and disc quota enabled. Else we warn user to enable quota on that partition. """ ok_message = "/var/cagefs located on partition with quota enabled" failed_message = ( "Details: /var/cagefs located on partition with quota disabled.\n" "Please, activate quota for /var/cagefs for better security.\n" "See details: " "https://docs.cloudlinux.com/cloudlinux_os_components/#installation-and-update-2" ) quota_unworkable_message = ( "Quotas seems unworkable on this server. Please correctly setup quotas to run this checker" ) cagefsctl = "/usr/sbin/cagefsctl" setquota = "/usr/sbin/setquota" # get mountpoint from output of "df -h /var/cagefs" cagefs_mountpoint = get_dir_mountpoint("/var/cagefs") if cagefs_mountpoint is None or not os.path.isdir("/var/cagefs") or not os.path.isfile(cagefsctl): return ChkResult(SKIPPED, "Cagefs is not installed") if not os.path.isdir("/usr/share/cagefs-skeleton/bin"): return ChkResult(SKIPPED, "Cagefs is not initialized") if not is_quota_active(): return ChkResult(FAILED, failed_message) username = None is_testuser_exists = False if os.path.isfile(_CLDIAG_USERNAME_FILE): # Get test username from file username = get_username_from_file() if username is not None: try: user_pw = pwd.getpwnam(username) user_uid, user_gid = user_pw.pw_uid, user_pw.pw_gid except KeyError: # test user absent pass else: is_testuser_exists = True else: # File absent - clean system from trash (old) test users remove_all_trash_cldiag_users() if not is_testuser_exists: username = f"{_CLDIAG_TEST_USENAME_PREFIX}_{uuid.uuid4().hex}"[:32] useradd(username) user_pw = pwd.getpwnam(username) user_uid, user_gid = user_pw.pw_uid, user_pw.pw_gid try: # save username to file with open(_CLDIAG_USERNAME_FILE, "w", encoding="utf-8") as f: f.write(username) except (OSError, IOError): pass create_cagefs_dir_cmd = f"{cagefsctl} --cpetc {user_uid}" set_quota_limit_cmd = f"{setquota} -u {username} 0 0 1 1 {cagefs_mountpoint}" reset_quota_limit_cmd = f"{setquota} -u {username} 0 0 0 0 {cagefs_mountpoint}" try: try: prefix = f"{user_uid % 100:02d}" # getting last 2 digits from uid tempfile_dir = f"/var/cagefs/{prefix}/{username}/etc/cl.selector/" tempfile_name = str(random.random()) tempfile_full_path = Path(tempfile_dir, tempfile_name) if not os.path.isdir(tempfile_dir): run_command(create_cagefs_dir_cmd.split(" ")) # creating user directory run_command(set_quota_limit_cmd.split(" ")) # setting quota limit to 1 inode # trying to create empty file on the partition with quota # and expect it fails with Disk quota exceeded error with subprocess.Popen( ["/bin/touch", tempfile_name], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, start_new_session=True, cwd=tempfile_dir, preexec_fn=demote(user_uid, user_gid), env={**os.environ, **{"LC_ALL": "C"}}, ) as proc: stdout, _ = proc.communicate() with drop_privileges(username): if not tempfile_full_path.exists() and "Disk quota exceeded" in stdout: return ChkResult(OK, ok_message) if not tempfile_full_path.exists(): raise OSError(stdout) tempfile_full_path.unlink() finally: run_command(reset_quota_limit_cmd.split(" ")) except ExternalProgramFailed: return ChkResult(FAILED, quota_unworkable_message) return ChkResult(FAILED, failed_message) def get_short_error_message(error, detailed_out, max_error_lines=10): """ Handles error message making it shorter, if it is bigger than max limit :param error: error message to make shorter :param detailed_out: way for user to get full error manually :param max_error_lines: max lines for error :return: initial error (less than 10 lines) short error """ error_lines = error.split("\n") if len(error_lines) > max_error_lines: return "\n".join( error_lines[: max_error_lines // 2] + ["..."] + error_lines[-max_error_lines // 2 :] + [detailed_out] ) return error def is_email_notification_enabled() -> bool: """ Return true if automatic cldiag email notifications about problems enabled. """ enable_cldiag = detect.get_boolean_param(detect.CL_CONFIG_FILE, "ENABLE_CLDIAG", separator="=", default_val=True) return enable_cldiag def get_list_of_disabled_cron_checkers() -> List[Optional[AnyStr]]: """ Get list of disabled cldiag checkers which run by cron from /etc/sysconfig/cloudlinux """ try: config = configparser.ConfigParser( interpolation=None, strict=False, defaults={ cron_cldiag_checkers_param_name: "", }, ) config.read(detect.CL_CONFIG_FILE) result = config.get( cron_cldiag_section_name, cron_cldiag_checkers_param_name, ) except configparser.Error: return [] return [item.strip() for item in result.strip().split(",") if item] def set_list_of_disabled_cron_checkers(disabled_cron_cherkers: List[Optional[AnyStr]]) -> None: """ Set list of disabled cldiag checker which run by cron in /etc/sysconfig/cloudlinux """ try: config = configparser.ConfigParser( interpolation=None, strict=False, ) config.read(detect.CL_CONFIG_FILE) if cron_cldiag_section_name not in config.sections(): config.add_section(cron_cldiag_section_name) current_disabled_checkers = get_list_of_disabled_cron_checkers() if disabled_cron_cherkers: disabled_cron_cherkers.extend(current_disabled_checkers) config.set( cron_cldiag_section_name, cron_cldiag_checkers_param_name, ",".join(disabled_cron_cherkers), ) with open(detect.CL_CONFIG_FILE, "w+", encoding="utf-8") as f: config.write(f) except (configparser.Error, IOError, OSError) as err: print(f"Can't set list of disabled cron checkers to config" f'"{detect.CL_CONFIG_FILE}" because "{err}"') print("Please check config's existence, integrity and permissions") print(write_to_support_msg) sys.exit(1) @pretty_name("Check mount with hidepid=2 option") @skip_check_without_lve def check_hidepid(): # type: () -> ChkResult """ Check if system mounted with hidepid=2 option """ hidepid_doc_link = "https://docs.cloudlinux.com/cloudlinux_os_kernel/#remounting-procfs-with-hidepid-option" fix_motivation = ( f"Details: hidepid protection disabled.\n" f"Please, mount system with hidepid=2 for better security.\n" f"Read more about hidepid option here: {hidepid_doc_link}" ) passed_message = "hidepid protection enabled" skipped_message = "Cagefs is not installed" # this checker only works with cagefs installed if not os.path.isfile("/usr/sbin/cagefsctl"): return ChkResult(SKIPPED, skipped_message) # looking for the line like this in /proc/mounts # proc /proc proc rw,nosuid,nodev,noexec,relatime,gid=1000,hidepid=2 0 0 # if such line does no exist, then it's bad for user if get_hidepid_typing_from_mounts() != 2: return ChkResult(FAILED, fix_motivation) return ChkResult(OK, passed_message) @pretty_name("Check user's low PMEM limits") @skip_check_without_lve def check_low_pmem_limits() -> ChkResult: """ Checks low PMEM limits availability on server """ doc_link = "https://docs.cloudlinux.com/limits/#limits-validation" failed_message = "Some user(s) on server has low PMEM LVE limit (lower than 512 MB). See doc: " + doc_link passed_message = "Check low PMEM limits passed" result = LimitsValidator.is_low_pmem_limit_present() if result: return ChkResult(FAILED, failed_message) return ChkResult(OK, passed_message)