404

[ Avaa Bypassed ]




Upload:

Command:

elspacio@3.142.54.169: ~ $
# coding:utf-8

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
import errno
import os
import pwd
import re
import subprocess
import sys
import json
import shutil

from time import time
from typing import AnyStr, List, Dict, Optional  # NOQA
from pathlib import Path

from clcommon.clpwd import resolve_username_and_doc_root
from clcommon.cpapi import CP_NAME
from clcommon.cpapi.cpapiexceptions import NoDomain, NotSupported, IncorrectData
from clcommon.utils import mod_makedirs
from clsentry import init_sentry_client
from clsentry.utils import get_pkg_version

from .clselectprint import clprint
from .clselectexcept import ClSelectExcept


CAGEFS_ENTER_USER_BIN = '/sbin/cagefs_enter_user'
LVEMANDSN = "https://9713d1296f804031b058b8f2d789d7ac:8ddacae32d8246cf8b25cf826bf3fc0a@cl.sentry.cloudlinux.com/12"

def run_command(cmd, env_data=None):
    """
    Runs external process and returns output
    :param cmd: command and arguments as a list
    :param env_data
    :return string
    """
    try:
        output = subprocess.Popen(
            cmd,
            stdin=open('/dev/null'),
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            close_fds=True,
            env=env_data,
            text=True)
        std_out, std_err = output.communicate()
    except (OSError, IOError) as e:
        raise ClSelectExcept.FileProcessError(cmd[0], str(e))
    if output.returncode != 0:
        raise ClSelectExcept.ExternalProgramFailed(std_err or 'output of the command: %s\n%s' % (' '.join(cmd),
                                                                                                 std_out))
    return std_out


def run_command_full(cmd, env_data=None, preexec_fn=None, cwd=None):
    """
    Runs external process and returns output.
    Differs from subprocess.check_output, run_command above, and check_output below in that it does not
     throw an exception if process's return code != 0
    :param cmd: command and arguments as a list
    :param env_data
    :param preexec_fn: Pre-exec function. None if don't need
    :param cwd: Directory name to set as current
    :return Cortege: (ret_code, stdout, stderr)
    """
    try:
        process = subprocess.Popen(
            cmd,
            stdin=open('/dev/null'),
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            preexec_fn=preexec_fn,
            close_fds=True,
            cwd=cwd,
            env=env_data)
        std_out, std_err = process.communicate()
    except (OSError, IOError) as e:
        raise ClSelectExcept.FileProcessError(cmd[0], str(e))
    return process.returncode, std_out, std_err


def read_file_as_string(filename):
    """
    Reads file contents and returns it as a string
    """
    defaults_contents = ""
    try:
        f = open(filename)
        defaults_contents = f.read()
        f.close()
    except (OSError, IOError):
        pass
    return defaults_contents


def check_call(*args, **kwargs):
    check_output(*args, **kwargs)


def check_output(*args, **kwargs):
    """
    check_output(val1, val2, val3, arg4=val4)
    equivalent
    check_output(args=(val1, val2, val3), arg4=val4)
    or equivalent
    check_output(val1, args=(val2, val3), arg4=val4)
    DON'T USE check_output((val1, val2, val3), arg4=val4)
    :param tuple *args: arguments for command line
    :param dict **kwargs: parameters for subprocess.Popen
    :return str:
    """
    error = kwargs.pop('error', None)
    args_from_kwargs = kwargs.get('args')
    output = kwargs.pop('output', None)
    wait = kwargs.pop('wait', None)
    if args_from_kwargs:
        args += args_from_kwargs
        del kwargs['args']
    try:
        p = subprocess.Popen(args=args, stdout=subprocess.PIPE,
                             stderr=subprocess.PIPE, text=True, **kwargs)
    except (OSError, IOError) as e:
        raise ClSelectExcept.FileProcessError(args[0], str(e))
    stdout, stderr = p.communicate()
    if output:
        output.write(stdout)
        output.close()
    if wait is not None:
        p.wait()
    if p.returncode:
        raise ClSelectExcept.ExternalProgramFailed(error or stderr or stdout)
    return stdout


def list_dirs(path):
    return [d for d in os.listdir(path)
            if os.path.isdir(os.path.join(path, d))]


def versioning(path, versions):
    yield path
    for major, minors in versions:
        yield path + str(major)
        for minor in minors:
            yield path + '.'.join(map(str, (major, minor)))


def mkdir_p(path):
    try:
        os.makedirs(path)
    except OSError as exc:
        if exc.errno == errno.EEXIST and os.path.isdir(path):
            pass
        else:
            raise


def get_abs_rel(user, directory):
    """
    Returns absolute and relative to user's home directories paths,
    after eliminating all symlinks
    :param user: Unix user whose home contains the directory
    :param directory: The directory to obtain absolute and relative paths for
    """
    user_dir = os.path.realpath(pwd.getpwnam(user).pw_dir)
    abs_dir = os.path.realpath(os.path.join(user_dir, directory))
    rel_dir = s_partition(abs_dir, user_dir)[2].lstrip(os.sep)
    if not all((abs_dir, rel_dir)):
        raise ClSelectExcept.WrongData(
            "Directory '%s' not in user home ('%s')" % (directory, user_dir))
    return abs_dir, rel_dir


def in_cagefs():
    return os.path.isdir('/var/.cagefs')


# TODO: need move to clcommon
# safely open/close files in python2.4
# http://stackoverflow.com/questions/3770348/how-to-safely-open-close-files-in-python-2-4
# TODO: consider to remove this method and use builtin
def file_read(path, mode='r', errors=None):
    """
    read file and close
    :param path: file path
    :param mode: reading mode
    :return str:
    """
    data = None
    stream = open(path, mode=mode, errors=errors)
    try:
        data = stream.read()
    finally:
        stream.close()
    return data


def file_readlines(path, errors=None):
    """
    read litle file and close
    :param path:
    :return str:
    """
    data = []
    stream = open(path, errors=errors)
    try:
        data = stream.readlines()
    finally:
        stream.close()
    return data


def file_write(path, line, mode='w', errors=None):
    """
    write litle data to file and close
    :param path:
    :return str:
    """
    stream = open(path, mode, errors=errors)
    try:
        stream.write(line)
    finally:
        stream.close()


def file_writelines(path, lines, mode='w', errors=None):
    """
    write litle data to file and close
    :param path:
    :return str:
    """
    stream = open(path, mode, errors=errors)
    try:
        stream.writelines(lines)
    finally:
        stream.close()


def s_partition(s, sep):
    """
    str.partition for all python versions
    :param s:		string to parse
    :param sep:		separator to split by
    :return:		cortege - see str.partition function for python 2.5+
    """
    if sys.version_info[0] >= 2 and sys.version_info[1] >= 5:
        return s.partition(sep)
    # Python v2.4 and lower does not support str.partition, so emulating it
    s_parts = s.split(sep, 1)
    if len(s_parts) == 2:
        ret_cortege = (s_parts[0], sep, s_parts[1])
    else:
        ret_cortege = (s_parts[0], '', '')
    return ret_cortege


def grep(pattern, path):
    """
    find regex count in file
    """
    try:
        f = open(path, "r")
    except IOError:
        return None

    counter = 0
    regex = re.compile(pattern)
    for line in f:
        if regex.search(line):
            counter += 1

    f.close()
    return counter


def user_should_be_skipped(username):
    """
    Determines is user should be skipped in selectorctl work
    :param username: user name to check
    :return: True - user should be skipped, False - not
    """
    from clcommon.cpapi import is_admin
    # user is DA admin, pass it
    return is_admin(username)


def make_dir(dest_dir):
    if not os.path.isdir(dest_dir):
        try:
            mod_makedirs(dest_dir, 0o755)
        except (IOError, OSError) as e:
            print('Error: failed to create', dest_dir, ':', str(e))
            sys.exit(1)


def remove_file_or_dir(path):
    try:
        os.unlink(path)
    except OSError:
        pass
    if os.path.isdir(path):
        shutil.rmtree(path, True)


def make_symlink(src, dst):
    try:
        if os.path.islink(dst):
            if os.readlink(dst) != src:
                os.unlink(dst)
                os.symlink(src, dst)
        else:
            remove_file_or_dir(dst)
            os.symlink(src, dst)
    except OSError as e:
        print('Error: failed to create symlink', dst, '->', src, ':', str(e))
        sys.exit(1)


def pretty_json(data):
    return json.dumps(data, indent=4, separators=(',', ': '), sort_keys=True)


# TODO: we have the same safely_resolve_username_and_doc_root in selectorlib, need some refactoring
def safely_resolve_username_and_doc_root(user=None, domain=None, msg_format='text', pass_not_supported_panel=False):
    """
    Safely resolve username and doc_root by domain,
    or resolve document root by username,
    or resolve document root and username by effective uid
    :param pass_not_supported_panel: pass not supported panel for php selector
    :param msg_format: error messages' format (json or text)
    :param user: str -> name of unix user
    :param domain: str -> domain of panel user
    :return: tuple -> user, doc_root
    """

    result_user = user
    result_doc_root = None
    try:
        result_user, result_doc_root = resolve_username_and_doc_root(
            user=user,
            domain=domain,
        )
    except NoDomain:
        clprint.print_diag(
            msg_format,
            {
                'status': 'ERROR',
                'message': 'No domain found for user: {}, domain: {}'.format(user, domain)
            }
        )
        sys.exit(1)
    except NotSupported:
        if pass_not_supported_panel:
            pass
        else:
            clprint.print_diag(
                msg_format,
                {
                    'status': 'ERROR',
                    'message': 'NodeJs/Ruby/Python selector not supported for %s panel' % CP_NAME
                }
            )
            sys.exit(1)
    except IncorrectData:
        clprint.print_diag(
            msg_format,
            {
                'status': 'ERROR',
                'message': 'Domain %s is not owned by the user %s' % (domain, user)
            }
        )
        sys.exit(1)

    return result_user, result_doc_root


def get_using_realpath_keys(user, directory, config):
    """
    Transform all directories to realpath, compare and return config value
    :param user: str -> unix user - owner of the config
    :param directory: str -> wanted directory key
    :param config: dict -> config to get value from,
        should have directories as keys
    """
    abs_directory = get_abs_rel(user, directory)[0]
    for conf_dir in config:
        abs_conf_dir = get_abs_rel(user, conf_dir)[0]
        if abs_directory == abs_conf_dir:
            return config[conf_dir]

    raise KeyError(
        'Config does not contain directory: {}'.format(directory))


def delete_using_realpath_keys(user, directory, config):
    """
    Transform all directories to realpath, compare and remove directory key
    :param user: str -> unix user - owner of the config
    :param directory: str -> wanted directory key
    :param config: dict -> config to remove key from,
        should have directories as keys
    """
    abs_directory = get_abs_rel(user, directory)[0]
    removed_at_least_one = False
    config_keys = list(config.keys())
    for conf_dir in config_keys:
        abs_conf_dir = get_abs_rel(user, conf_dir)[0]
        if abs_directory == abs_conf_dir:
            del config[conf_dir]
            removed_at_least_one = True
    if not removed_at_least_one:
        raise KeyError(
            'Config does not contain directory: {}'.format(directory))


def realpaths_are_equal(user, path1, path2):
    """
    Checks that two paths in user home directory are the same after
    stripping symlinks
    :param user: Unix user whose home directory contains both paths
    :type path1: str
    :type path2: str
    :return: True if paths are canonically the same otherwise False
    :rtype: bool
    """
    return get_abs_rel(user, path1)[0] == get_abs_rel(user, path2)[0]


def run_process_in_cagefs(user, path_to_utility, args_for_utility, env_vars=None):
    # type: (AnyStr, AnyStr, List[AnyStr], Optional[Dict[AnyStr, AnyStr]]) -> Dict[AnyStr, AnyStr, int]
    """
    Run any process using the utility cagefs_enter_user
    """
    result = {}

    # there are cases when we need to store some env vars
    # (e.g HOME when calling pip during import)
    if not env_vars:
        env_vars = {}
    cmd = [CAGEFS_ENTER_USER_BIN, user, path_to_utility] + args_for_utility
    process = subprocess.Popen(cmd, env=env_vars,
                               stdout=subprocess.PIPE,
                               stderr=subprocess.PIPE,
                               text=True)
    stdout, _ = process.communicate()
    retcode = process.returncode
    # contition `retcode > 128` because in lve-wrappers:
    # if (WIFSIGNALED(status)) {
    #     exit(WTERMSIG(status) + 128);
    # }
    if not stdout and retcode > 128:
        stdout = {
            'details': 'User\'s process "{}" failed and didn\'t return anything. It could mean that user '
                       'has too small limit of PMEM for running of cloudlinux-selector. Please, '
                       'increase the limit to user and try run process again'.format(path_to_utility),
            'result': 'error',
            'timestamp': time(),
        }
    else:
        stdout = stdout.strip()
    result['output'] = stdout
    result['returncode'] = retcode
    result['failed'] = True if retcode else False
    return result


def is_imunify_using_python(version: str) -> bool:
    """
    Check if python version is used by imunify packages
    The algorithm is as follows:
    - Verify if imunify services are installed
    - Check if shebang contains the path to the specified Python version being checked
    - If shebang contains the path to the imunify virtual environment,
      determine the Python version it is based on by reading a symlink
    """
    # Init sentry for this function
    sentry_client = init_sentry_client(
        'lvemanager',
        release=get_pkg_version('lvemanager'),
        dsn=LVEMANDSN,
        handle=False
    )

    alt_path = f'/opt/alt/python{version.replace(".", "")}'
    imunify_venv_path = '/opt/imunify360/venv'
    imunify_service = Path('/usr/bin/imunify-service')
    if not imunify_service.is_file():
        return False

    try:
        with open(imunify_service) as f:
            first_line = f.readline()

        if not first_line.startswith('#!'):
            raise Exception(f'ERROR: "{imunify_service}" doesn\'t have a shebang')

        if alt_path in first_line:
            return True

        if imunify_venv_path in first_line:
            venv_python_bin = Path(first_line.lstrip('#!').strip())
            python_bin = venv_python_bin.readlink()
            if alt_path in str(python_bin):
                return True
    except:
        sentry_client.captureException()

    return False


def demote(user: str):
    """
    Drops to user
    """
    user_pwd = pwd.getpwnam(user)

    def func():
        os.setgid(user_pwd.pw_gid)
        os.setuid(user_pwd.pw_uid)
        os.environ['USER'] = user
        os.environ['HOME'] = user_pwd.pw_dir

    return func


def apply_for_at_least_one_user(func, users, exception_type, *args, **kwargs):
    caught_exception = None
    is_handled = False

    for user_alias in users:
        try:
            result = func(user_alias, *args, **kwargs)
        except exception_type as e:
            caught_exception = e
        else:
            is_handled |= True

    if not is_handled:
        raise caught_exception

    return result

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
baseclselect Folder 0755
clselectnodejs Folder 0755
clselectnodejsuser Folder 0755
clselectphp Folder 0755
clselectphpuser Folder 0755
clselectpython Folder 0755
clselectpythonuser Folder 0755
clselectruby Folder 0755
__init__.py File 536 B 0644
clextselect.py File 19.71 KB 0644
clpassenger.py File 26.53 KB 0644
clselect.py File 21.77 KB 0644
clselectctl.py File 9.15 KB 0644
clselectctlnodejsuser.py File 20.71 KB 0644
clselectctlphp.py File 43.37 KB 0644
clselectctlpython.py File 43.87 KB 0644
clselectctlruby.py File 18.59 KB 0644
clselectexcept.py File 10.22 KB 0644
clselectprint.py File 5.39 KB 0644
clselectstatistics.py File 6.51 KB 0644
cluserextselect.py File 15.17 KB 0644
cluseroptselect.py File 23.62 KB 0644
cluserselect.py File 27 KB 0644
locked_extensions.ini File 1.2 KB 0644
utils.py File 16 KB 0644