# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
"""
This module contains a config parser for cloudlinux-xray-autotracing
"""
import logging
import os
import pwd
from configparser import ConfigParser
from dataclasses import dataclass
from enum import Enum
from typing import Any, Union, Tuple, Iterator, List, Optional
from clcommon.clpwd import ClPwd
from clcommon.const import Feature
from clcommon.cpapi import cpusers, is_panel_feature_supported
from clcommon.cpapi.cpapiexceptions import CPAPIException
from ..internal.constants import flag_file
from ..internal.exceptions import SSAError
from ..internal.utils import (
umask_0,
set_privileges,
is_xray_user_agent_active,
xray_version,
is_kernel_version_supported
)
logger = logging.getLogger('autotracing.config')
def is_autotracing_supported() -> bool:
"""Currently Auto tracing feature is not supported on Shared edition"""
return is_panel_feature_supported(Feature.AUTOTRACING)
class Status(Enum):
"""
Autotracing statuses
"""
ENABLED = 'enabled'
DISABLED = 'disabled'
@dataclass
class User:
"""
User container
"""
uid: int
name: str
home: str
class AutotracingConfig(ConfigParser):
"""
Autotracing basic config parser
"""
main_section = 'conf'
def check_config_dir(self) -> None:
"""
If subdirectory location for autotracing config file does not exist,
create it
"""
subdir_path = os.path.dirname(self.config_file)
if not os.path.exists(subdir_path):
os.mkdir(subdir_path)
def set_config_value(self, key: Any, value: Any) -> None:
"""
Set given config item 'key' to given value 'value'
"""
self[self.main_section][key] = value
self.check_config_dir()
with open(self.config_file, 'w') as configfile:
self.write(configfile)
def get_config_value(self, key: Any) -> Any:
"""
Set given config item 'key' to given value 'value'
"""
self.read(self.config_file)
return self[self.main_section][key]
def set_status(self, value: Any) -> None:
"""
Set given status
"""
self.set_config_value('status', value)
def get_status(self) -> Any:
"""
Set given status
"""
return self.get_config_value('status')
class AdminLevelConfig(AutotracingConfig):
"""Admin level autotracing config"""
def __init__(self):
defaults = {
'status': 'disabled'
}
self.config_file = '/usr/share/clos_ssa/autotracing'
super().__init__(defaults, default_section=self.main_section,
strict=False)
class UserLevelConfig(AutotracingConfig):
"""User level autotracing config"""
def __init__(self, configpath: str):
defaults = {
'status': AdminLevelConfig().get_status()
}
self.config_file = f'{configpath}/.ssa/autotracing'
super().__init__(defaults, default_section=self.main_section,
strict=False)
def who_am_i() -> User:
"""
Get current user and his details
"""
pw_entry = pwd.getpwuid(os.getuid())
return User(pw_entry.pw_uid, pw_entry.pw_name, pw_entry.pw_dir)
def config_instance(user_home: str = None) -> Union[AdminLevelConfig, UserLevelConfig]:
"""
Initialize correct config file instance depending on context
"""
current_user = who_am_i()
if current_user.uid == 0:
# in Admin mode: globally or for particular user
if user_home:
# for a particular user
conf_instance = UserLevelConfig(user_home)
else:
# globally
conf_instance = AdminLevelConfig()
else:
# in User mode: user's config only
if is_xray_user_agent_active():
conf_instance = UserLevelConfig(current_user.home)
else:
# if no X-Ray App available, do not allow manipulations
raise SSAError(
'Auto tracing management is not available. Reason: X-Ray End-User plugin is not enabled, please contact your system administrator for help.')
return conf_instance
def set_config_value(value: str, user: str = None) -> None:
"""
"""
if user:
# try to modify user's config with dropping privileges
try:
pw_data = pwd.getpwnam(user)
except KeyError as e:
raise SSAError(f"User '{user}' not found") from e
try:
with set_privileges(target_uid=pw_data.pw_uid,
target_gid=pw_data.pw_gid):
config_instance(pw_data.pw_dir).set_status(value)
except PermissionError as e:
raise SSAError(e.strerror) from e
else:
with umask_0(0o022):
# remove write for group
config_instance().set_status(value)
def enable(username: str = None, mode_all: bool = False) -> None:
"""
Enable autotracing.
If username is given, the user's config is changed in Admin's mode.
Perform some misconfiguration checks before enabling and
do not enable if some of them appear
"""
try:
misconfiguration_checks()
except SSAError as e:
issue = e.reason
else:
issue = None
if mode_all and username is None:
remove_custom_users_configs()
set_config_value(Status.ENABLED.value, username)
return issue
def disable(username: str = None, mode_all: bool = False) -> None:
"""
Disable autotracing.
If username is given, the user's config is changed in Admin's mode
"""
if mode_all and username is None:
remove_custom_users_configs()
set_config_value(Status.DISABLED.value, username)
def status(username: str = None) -> Optional[Tuple[str, Optional[str]]]:
"""
Get status of autotracing.
If username is given, the status for a particular user is returned
"""
try:
misconfiguration_checks()
except SSAError as e:
issue = e.reason
else:
issue = None
if username is not None:
try:
return UserLevelConfig(
ClPwd().get_homedir(username)
).get_status(), None
except ClPwd.NoSuchUserException as e:
raise SSAError(str(e)) from e
return AdminLevelConfig().get_status(), issue
def _panel_users() -> Tuple:
"""
Get panel users via cpapi, ignoring exceptions like NotSupported, etc.
"""
try:
return cpusers()
except CPAPIException:
return tuple()
def user_configs() -> Iterator[Tuple[str, UserLevelConfig]]:
"""
Iterator over all users on the server along with their autotracing configs
"""
for user in _panel_users():
try:
_homedir = ClPwd().get_homedir(user)
except ClPwd.NoSuchUserException:
continue
yield user, UserLevelConfig(_homedir)
def disabled_users() -> List[str]:
"""Get list of disabled users"""
return [username for username, userconf in user_configs() if
userconf.get_status() == Status.DISABLED.value]
def remove_custom_users_configs() -> None:
"""
Remove custom users configurations
"""
for user, user_config_path in user_configs():
pw_data = pwd.getpwnam(user)
try:
with set_privileges(target_uid=pw_data.pw_uid,
target_gid=pw_data.pw_gid):
# if config is actually exists
if os.path.isfile(user_config_path.config_file):
os.remove(user_config_path.config_file)
os.rmdir(os.path.dirname(user_config_path.config_file))
except PermissionError as e:
raise SSAError(e.strerror) from e
def misconfiguration_checks() -> None:
"""Additional checks for known malfunctions"""
def make_error(reason: str, fix: str) -> SSAError:
message = f'{reason}. You should {fix} in order to get Auto Tracing work'
return SSAError(message, flag='warning')
# check for edition
if not is_autotracing_supported():
raise make_error(
'Your current server setup is unsupported by Auto Tracing feature',
'switch Control Panel or CloudLinux edition',
)
# check of IO throttling availability
if not is_kernel_version_supported():
raise make_error(
'Your kernel does not support throttling detection',
'update the kernel',
)
# check if X-Ray is installed
if xray_version() is None:
raise make_error('X-Ray is not installed', 'install X-Ray')
# check if SSA is enabled
if not os.path.isfile(flag_file):
raise make_error('Slow Site Analyzer is disabled', 'enable it')