# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
import configparser
import io
import os
import json
import fcntl
import struct
import time
from typing import Optional, Dict, AnyStr, Union
from jwt import decode, exceptions
from clcommon.lib.consts import DEFAULT_JWT_ES_TOKEN_PATH
from clcommon.lib.whmapi_lib import WhmApiRequest, WhmApiError
from clcommon.clcaptain import mkdir as mkdir_p
from clcommon.utils import process_is_running
from clconfig.cagefs_statistics_config import check_cagefs_initialized
from cldetectlib import CL_CONFIG_FILE
from secureio import write_file_via_tempfile
def dummy_none_function(*a, **kw):
return None
try:
from clselect.clselectctl import interpreter_versions_short_summary
from clselector.clpassenger_detectlib import is_clpassenger_active
except ImportError:
interpreter_versions_short_summary = dummy_none_function
is_clpassenger_active = dummy_none_function
_CL_STATISTICS_SECTION = "license_check"
_CL_STATISTICS_COLLECT_STATE_OPTION = "cl_statistics_enabled"
_CL_STATISTICS_COLLECT_RPM_STATE_OPTION = "cl_statistics_rpm_enabled"
_CL_STATISTICS_DIR = '/var/lve'
_CL_STATISTICS_SEND_STATUS_FILE = os.path.join(_CL_STATISTICS_DIR, 'summary_status.json')
_CL_STATISTICS_LOCK_PATH = '/var/run/cloudlinux_summary.send.lock'
_CL_STATISTICS_LOCK_FILE = None
CL_PLUS_SENDER_FILE_PATH = '/usr/share/cloudlinux/cl_plus/clplus_sender.py'
ALT_PYTHON_VIRTUALENV_BIN = '/opt/cloudlinux/venv/bin/virtualenv'
class SummaryStatus:
"""
Status of both, collecting and sending statistics
If process still collects statistics -> IN_PROGRESS
If statistics collected and sent correctly -> SUCCESS
If any error during collecting or sending -> FAILED
"""
SUCCESS = 'success'
IN_PROGRESS = 'in_progress'
FAILED = 'failed'
def is_virtualenv_installed():
"""
Checks is virtualenv installed
:return: True/False - installed or not
"""
return os.path.exists(ALT_PYTHON_VIRTUALENV_BIN)
def is_locked(lock_file):
"""
Check if file is locked by another process without acquiring lock.
IMPORTANT! This function should NOT be used to check lock acquired by the
same process that executes the is_locked() function. For example, when
process executes fcntl.lockf(LOCK_FILE), and then the same process executes
is_locked(LOCK_FILE), the is_locked(LOCK_FILE) call returns False.
Use is_locked() function to check lock acquired by another process only.
:param lock_file: file to check lock on
:type lock_file: file object or descriptor
"""
lock_data = struct.pack("hhllhh", fcntl.F_WRLCK, 0, 0, 0, 0, 0)
try:
lock_query = fcntl.fcntl(lock_file, fcntl.F_GETLK, lock_data)
lock_status = struct.unpack("hhllhh", lock_query)[0]
except (OSError, IOError):
# should never happen
return False
return lock_status != fcntl.F_UNLCK
def is_sending_process_running(acquire_lock=False):
"""
Check if processes collecting stats are running already (with --send option in command line)
:param acquire_lock: True = acquire lock when possible
:type acquire_lock: bool
:return bool: True = Processes are running
"""
global _CL_STATISTICS_LOCK_FILE
_CL_STATISTICS_LOCK_FILE = open( # pylint: disable=consider-using-with
_CL_STATISTICS_LOCK_PATH, 'w', encoding='utf-8'
)
if not acquire_lock:
return is_locked(_CL_STATISTICS_LOCK_FILE)
try:
fcntl.lockf(_CL_STATISTICS_LOCK_FILE, fcntl.LOCK_EX | fcntl.LOCK_NB)
except (OSError, IOError):
return True
return False
def _get_config():
"""
Retrieves ConfigParser object for /etc/sysconfig/cloudlinux file
:return:
"""
config = configparser.ConfigParser(interpolation=None,
strict=True)
config.optionxform = str # make config case sensitive
config.read(CL_CONFIG_FILE)
return config
def _write_config(config):
"""
write config to /etc/sysconfig/cloudlinux file
:param config: configParser object to write
return boolean: True|False
"""
try:
string_fp = io.StringIO()
config.write(string_fp)
content = string_fp.getvalue()
write_file_via_tempfile(content, CL_CONFIG_FILE, 0o644, prefix='cloudlinux_')
except (OSError, IOError):
return False
return True
def _get_config_value(parameter: str, default: bool = True) -> bool:
"""
Retrieves parameter's value from /etc/sysconfig/cloudlinux file, _CL_STATISTICS_SECTION section
"""
config = _get_config()
res = default
try:
res = config.getboolean(_CL_STATISTICS_SECTION, parameter)
except (configparser.NoSectionError, configparser.NoOptionError, ValueError):
# Treat absent/missing value as default
pass
return res
def _set_config_value(parameter: str, value: bool) -> None:
"""
Sets parameter's value to /etc/sysconfig/cloudlinux file, _CL_STATISTICS_SECTION section
"""
config = _get_config()
config.set(_CL_STATISTICS_SECTION, parameter, str(int(value)))
_write_config(config)
def is_statistic_enabled() -> bool:
"""
Retrieves statistic collection status from /etc/sysconfig/cloudlinux file
:return: True/False - enabled/disabled
"""
return _get_config_value(_CL_STATISTICS_COLLECT_STATE_OPTION)
def is_statistic_rpm_enabled() -> bool:
"""
Retrieves rpm statistic collection status from /etc/sysconfig/cloudlinux file
:return: True/False - enabled/disabled
"""
return _get_config_value(_CL_STATISTICS_COLLECT_RPM_STATE_OPTION)
def set_statistic_collection_enabled(is_enabled: bool) -> None:
"""
Set statistic collection status to /etc/sysconfig/cloudlinux file
:param is_enabled: True/False - enabled/disabled
"""
_set_config_value(_CL_STATISTICS_COLLECT_STATE_OPTION, is_enabled)
def set_statistic_rpm_collection_enabled(is_enabled: bool) -> None:
"""
Set rpm statistic collection status to /etc/sysconfig/cloudlinux file
:param is_enabled: True/False - enabled/disabled
"""
_set_config_value(_CL_STATISTICS_COLLECT_RPM_STATE_OPTION, is_enabled)
def write_statistics_send_status_to_file(status_dict):
"""
Writes statistics send status to file /var/lve/summary_status.json
:param status_dict: status dictionary for write to file
:return:
"""
try:
if not os.path.exists(_CL_STATISTICS_DIR):
mkdir_p(_CL_STATISTICS_DIR)
content = json.dumps(status_dict)
# Write to file readable only for root
write_file_via_tempfile(content, _CL_STATISTICS_SEND_STATUS_FILE, 0o600, prefix='cloudlinux_')
except (OSError, IOError):
pass
def get_statistics_send_status_from_file():
"""
Retrieves statistics send status from file /var/lve/summary_status.json
:return: Dictionary with last send status. None if any error
"""
status_dict = None
try:
with open(_CL_STATISTICS_SEND_STATUS_FILE, encoding='utf-8') as f:
s_content = f.read()
status_dict = json.loads(s_content)
if status_dict['result'] == SummaryStatus.IN_PROGRESS \
and not is_sending_process_running():
# something went wrong during collection
status_dict['result'] = SummaryStatus.FAILED
status_dict['reason'] = 'Collecting statistics was failed. Error ' \
'report has been sent to developers and will be fixed soon'
except (OSError, IOError, ValueError, AttributeError, TypeError):
pass
return status_dict
def installed_interpreters_list(interpreter):
"""
Returns list of installed interpreters
:param interpreter: str - name of interpreter
:rtype: List of InterpreterSummary
"""
return [i for i in interpreter_versions_short_summary(interpreter) if i.installed]
def is_python_selector_installed():
"""
Checks that python selector is installed
Installed if:
- ea-apache24-mod-alt-passenger or alt-mod-passenger is installed
- alt-python-virtualenv is installed
:rtype: bool
"""
return is_clpassenger_active() and is_virtualenv_installed()
def is_ruby_selector_installed():
"""
Checks that ruby selector is installed
Installed if:
- ea-apache24-mod-alt-passenger or alt-mod-passenger is installed
- alt-python-virtualenv is installed
:rtype: bool
"""
return is_clpassenger_active() and is_virtualenv_installed()
def is_nodejs_selector_installed():
"""
Checks that nodejs selector is installed
Installed if:
- ea-apache24-mod-alt-passenger or alt-mod-passenger is installed
- At least one version is installed
:rtype: bool
"""
return is_clpassenger_active() and bool(installed_interpreters_list('nodejs'))
def is_php_selector_installed():
"""
Checks that php selector is installed
Installed if:
- CageFS is initialized
:rtype: bool
"""
return bool(check_cagefs_initialized())
def get_packages_with_lve_extensions():
"""
Gets packages with set lve limits via extension
"""
try:
result = WhmApiRequest('listpkgs').call()
except WhmApiError:
return []
lve_extensions_packages = [item['name'] for item in result['pkg']
if '_PACKAGE_EXTENSIONS' in item and item['_PACKAGE_EXTENSIONS'] == 'lve']
return lve_extensions_packages
def get_client_data_from_jwt_token(check_expiration=True) -> Optional[Dict[AnyStr, Union[AnyStr, bool]]]:
"""
Gets (if any) fields cl_plus and client_id from jwt token
:return: decoded jwt_token value, None if error
jwt_token: result of the successful decoding
"""
try:
with open(DEFAULT_JWT_ES_TOKEN_PATH, mode='rb') as file:
file_content = file.read().strip()
except (OSError, IOError):
return None
# JWT read success
try:
jwt_token = decode(file_content, algorithms=['HS256'],
options={'require_exp': True,
"verify_exp": check_expiration,
"verify_iss": True,
'verify_signature': False},
issuer='CloudLinux')
return jwt_token
# JWT format error
except exceptions.PyJWTError:
return None
def is_active_cloudlinux_license(token_data):
"""
Checks whether license ts is expired
"""
if not token_data:
return None
if not token_data.get('exp'):
return None
return int(time.time()) < int(token_data.get('exp'))
def get_cl_plus_sender_status() -> Optional[AnyStr]:
"""
Retrieves data from status of cl_plus_sender service
:return: status of service, Optional[AnyStr]
- 'active'
- 'inactive'
"""
try:
result = process_is_running(CL_PLUS_SENDER_FILE_PATH, False)
except FileNotFoundError:
result = False
return 'active' if result else 'inactive'