# coding=utf-8
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
from __future__ import absolute_import
from __future__ import print_function
from __future__ import division
import json
import os
import re
from abc import ABCMeta, abstractmethod
from datetime import datetime
import clselect.clpassenger as clpassenger
import secureio
from future.utils import iteritems
from past.builtins import unicode
from clcommon import ClPwd
from clcommon.utils import get_file_lines, write_file_lines
from clselect.utils import pretty_json, delete_using_realpath_keys, get_abs_rel
from clselect import ClSelectExcept
from clselect.baseclselect import (
BaseSelectorError,
AbsentFileError,
MissingVirtualenvError,
MissingAppRootError
)
from .selector_manager import BaseSelectorManager # NOQA
from clselect.utils import file_readlines, file_writelines
from clselect.utils import get_using_realpath_keys
from future.utils import with_metaclass
class BaseApplicationsManager(with_metaclass(ABCMeta, object)):
"""
Base class that responsible for gathering and set information about applications.
"""
# needs to be overridden in child class
_USER_CONFIG = None
_LOG_FILE_NAME = '/var/log/selectorctl.log'
INTERPRETER = None
VENV_DIR = None
BINARY_NAME = None
def __init__(self, manager):
self._manager = manager # type: BaseSelectorManager
self._pwd = ClPwd()
@classmethod
def write_string_to_log(cls, log_str):
"""
Writes string to log file
:param log_str: String to write
:return: None
"""
try:
dt_string = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
write_file_lines(cls._LOG_FILE_NAME, ['%s: %s\n' % (dt_string, log_str)], 'a')
except (IOError, OSError):
pass
def add_app_to_config(self, user, app_directory, app_data):
"""
Add data to user's config (update info for an app with app_directory specified)
:param str user: name of unix user
:param str app_directory: Application root directory
:param dict app_data: Application data
:return: None
"""
current_dict = self.get_user_config_data(user)
current_dict[app_directory] = app_data
# Write new config to file
self.write_full_user_config_data(user, current_dict)
def add_env_vars_for_htaccess(self, user_name, app_directory, env_vars, doc_root):
"""
Add enviroment variables to .htaccess file for LVEMAN-1623
:param str user_name: Name of unix user
:param str app_directory: Application root directory
:param dict env_vars: Dict of enviroment variables
:param str doc_root: doc root of application
:return: None
"""
env_section_begin = "# DO NOT REMOVE OR MODIFY. CLOUDLINUX ENV VARS CONFIGURATION BEGIN"
env_section_end = "# DO NOT REMOVE OR MODIFY. CLOUDLINUX ENV VARS CONFIGURATION END"
htaccess_file = self.get_htaccess_by_appdir(user_name, app_directory, doc_root, None)
if htaccess_file is None:
return
with open(htaccess_file, "r", errors='surrogateescape') as htaccess:
lines = [line for line in htaccess.read().split('\n') if line]
if env_section_begin in lines and env_section_end in lines:
start = lines.index(env_section_begin)
end = lines.index(env_section_end)
del lines[start: end + 1]
if env_vars is not None:
lines.append(env_section_begin)
lines.append("<IfModule Litespeed>")
for env_var, value in env_vars.items():
lines.append("SetEnv {0} {1}".format(env_var, value))
lines.append("</IfModule>")
lines.append(env_section_end)
with open(htaccess_file, "w", errors='surrogateescape') as htaccess:
htaccess.write('\n'.join(lines))
def remove_app_from_config(self, user, app_dir):
"""
Removes application from config
:param user: User name
:param app_dir: Application directory
:return: True if app is exists in config,
False - if app is not exists in config
"""
config_data = self.get_user_config_data(user)
try:
delete_using_realpath_keys(user, app_dir, config_data)
# write new config
self.write_full_user_config_data(user, config_data)
except KeyError:
return False
else:
return True
def replace_domain_in_configs(self, username, domain, new_domain, include_subdomains=False):
"""
Replace domain in config files when it is renamed.
:param username: domain owner
:param domain: previous name
:param new_domain: name after rename
:param include_subdomains: whether we should also rename subdomains
:return:
"""
full_config = self.get_user_config_data(username)
if not full_config:
return
# find application with domain
for app, config in iteritems(full_config):
if include_subdomains:
match = re.search(r'(\.|^)%s$' % domain, config['domain'])
else:
match = re.search(r'^%s$' % domain, config['domain'])
if match is not None:
config['domain'] = unicode('{}{}'.format(
# Cut out old_domain
config['domain'][:-len(domain)],
new_domain))
self.write_full_user_config_data(username, full_config)
@staticmethod
def _find_config_files(user_name, app_directory, patterns=None):
"""
Return list of detected config files
"""
abs_path, rel_path = get_abs_rel(user_name, app_directory)
return [config for config in patterns if os.path.exists(os.path.join(abs_path, config))]
def update_htaccess_file(self, user_name, app_root, doc_root):
"""
Creates .htaccess file for application based on application config data
:param str user_name: User's name
:param str app_root: Application root
:param doc_root: Document root for the domain
:return: None
"""
app_config = self.get_app_config(user_name, app_root)
user_dir = self._pwd.get_homedir(user_name)
htaccess_file = self.get_htaccess_by_appdir(user_name, app_root, doc_root, app_config)
new_lines = [
'{}\n'.format(clpassenger.HTACCESS_BEGIN),
'PassengerAppRoot "{}"\n'.format(os.path.join(user_dir, app_root)),
'PassengerBaseURI "/{}"\n'.format(app_config['app_uri']),
]
new_lines.extend(self.get_interpreter_specific_passenger_lines(
self.get_binary_path(user_name, app_root, user_dir),
app_config))
passenger_log_file = app_config.get('passenger_log_file', None)
if passenger_log_file:
new_lines.append('PassengerAppLogFile "%s"\n' % passenger_log_file)
new_lines.append(clpassenger.HTACCESS_END + '\n')
# Append all existing lines
new_lines.extend(file_readlines(htaccess_file, errors='surrogateescape'))
new_lines = clpassenger.rm_double_empty_lines(new_lines)
# write new .htaccess
file_writelines(htaccess_file, new_lines, 'w', errors='surrogateescape')
@staticmethod
@abstractmethod
def get_interpreter_specific_passenger_lines(binary_path, app_config):
"""
Return list of lines that needs to be added to htaccess and are
specific to the interpreter
"""
raise NotImplementedError
def get_binary_path(self, user, app_root, user_dir, binary_name=None):
"""
Return a path to the environment's interpreter binary
Get interpreter path for application
:param user: owner of the application
:param app_root: app path relative to user home (app-root)
:param user_dir: User's home directory
:param binary_name: name of binary in virtual environemnt (python, npm, node)
:return: path to interpreter binary in virtual environment
"""
version = self.get_interpreter_version_for_app(user, app_root)
if binary_name is None:
binary_name = self.BINARY_NAME
return os.path.join(user_dir, self.VENV_DIR, app_root, version, 'bin', binary_name)
def get_users_dict(self, username=None):
"""
Retrives info about user(s).
:param str | None username: Username to retrive information.
:return: Dictionary with user info. Example:
{'user1': pw_struct}
"""
if username is None:
return self._pwd.get_user_dict()
return {username: self._pwd.get_pw_by_name(username)}
def get_user_config_data(self, user):
"""
Get all data from user's config
:param user: name of unix user
:return: json data from user's config as dictionary
"""
_user_config_data = {}
user_config = self._get_path_to_user_config(user)
if os.path.isfile(user_config):
data = get_file_lines(user_config)
joined_data = ''.join(data)
try:
_user_config_data = json.loads(joined_data)
except (ValueError, TypeError):
raise ClSelectExcept.WrongData('User config "{}" is broken'.format(self._USER_CONFIG))
return _user_config_data
def get_app_config(self, user, app_dir):
"""
Retrieves full application config
:param user: User name
:param app_dir: Application directory
:return: Application data as dictionary
If None - No application data found in config
"""
user_config_data = self.get_user_config_data(user)
try:
return get_using_realpath_keys(user, app_dir, user_config_data)
except KeyError:
return None
def get_app_domain(self, username, app_directory):
"""
Retrieves domain for provided user's application
:param username: user name
:param app_directory: application root directory
:return str: application domain
"""
app_config = self.get_app_config(username, app_directory)
return app_config['domain']
def get_app_uri(self, username, app_directory):
"""
Retrieves uri for provided user's application
:param username: user name
:param app_directory: application root directory
:return str: application uri
"""
app_data = self.get_app_config(username, app_directory)
return app_data['app_uri']
def get_app_startup_file(self, username, app_directory):
"""
Retrieves name of startup file for provided user's application
:param username: user name
:param app_directory: application root directory
:return str: name of startup file of application
"""
app_data = self.get_app_config(username, app_directory)
return app_data['startup_file']
def get_app_status(self, username, app_directory):
"""
Retrieves status for provided user's app_directory
:param username: user name
:param app_directory: application root directory
:return str: status of application
"""
app_data = self.get_app_config(username, app_directory)
return app_data['app_status']
def get_interpreter_version_for_app(self, username, app_directory):
"""
Retrieves interpreter version for provided user and application
:param username: user name
:param app_directory: application root directory
:return str: major interpreter version
"""
app_data = self.get_app_config(username, app_directory)
if app_data is None:
raise ClSelectExcept.ConfigMissingError('Application config is missed '
'for user: {}'.format(username))
return app_data['%s_version' % self.INTERPRETER]
def _get_path_to_user_config(self, username):
"""
Get full path to user config ~/${_USER_CONFIG}
:param username: name of unix user
:return: full path to config
"""
user_home = self._pwd.get_homedir(username)
application_config = os.path.join(user_home, self._USER_CONFIG)
return application_config
def _get_full_version_for_short(self, major_version):
"""
Retrieves full version for supplied major version.
:return: str - Full version or None if full version not found
"""
full_version = self._manager.pkg.get_full_version(major_version)
if full_version != major_version:
return full_version
return None
def _add_single_user_app(self, users_data_dest_dict, user_pw_entry, app_root_dir, app_data):
"""
Add single application data to user_data_dest_dict
:param users_data_dest_dict: Destination dictionary with application data
:param user_pw_entry: User's passwd entry
:param app_root_dir: Application root directory
:param app_data: Application data
:return: None
"""
if user_pw_entry.pw_name in users_data_dest_dict:
# User already present in dict
users_data_dest_dict[user_pw_entry.pw_name]["applications"][app_root_dir] = app_data
else:
# No such user in dict - add it
user_data = {
"homedir": user_pw_entry.pw_dir,
"applications": {app_root_dir: app_data}
}
users_data_dest_dict[user_pw_entry.pw_name] = user_data
def _add_detected_config_files_to_application(self, app_data, app_root_dir, user_pw_entry):
"""Add automatically-detected config files to user-defined list"""
config_files_detected = self._find_config_files(user_pw_entry.pw_name, app_root_dir)
merged_files = set(app_data['config_files'] + config_files_detected)
app_data['config_files'] = list(merged_files)
def _add_all_user_apps(self, user_pw_entry, user_data_dest_dict, user_app_data):
"""
Add all user's apps information to user_data_dest_dict
:param user_pw_entry: User's passwd entry
:param user_data_dest_dict: Destination dictionary with application data
:param user_app_data: User's application data ([node|python|ruby]-selector.json file content as dictionary).
:return: None
"""
for app_root_dir, app_data in iteritems(user_app_data):
full_interpreter_version = self._get_full_version_for_short(app_data['%s_version' % self.INTERPRETER])
if full_interpreter_version is None or full_interpreter_version not in user_data_dest_dict:
# Application's interpreter version is absent in available versions of
# interpreters - skip application
continue
# We added section `users` because version has at least one application
if 'users' not in user_data_dest_dict[full_interpreter_version]:
user_data_dest_dict[full_interpreter_version]['users'] = {}
# Manually add venv path to config in order to display it in lvemanager
app_data['virtualenv'] = {
'activate_path': self.get_binary_path(
user_pw_entry.pw_name, app_root_dir, user_pw_entry.pw_dir,
binary_name='activate')
}
# we do not need this key (why?)
del app_data['%s_version' % self.INTERPRETER]
users_dict = user_data_dest_dict[full_interpreter_version]['users']
self._add_detected_config_files_to_application(app_data, app_root_dir, user_pw_entry)
self._add_single_user_app(users_dict, user_pw_entry, app_root_dir, app_data)
@staticmethod
def _add_absent_passenger_log_file(user_config_dict):
"""
Append absent 'passenger_log_file' key with None value to each application
:param user_config_dict: Sourse dictionary to modify
:return: Modified dict with 'passenger_log_file' keys
"""
for app_root, app_data in iteritems(user_config_dict):
if 'passenger_log_file' not in app_data:
app_data['passenger_log_file'] = None
user_config_dict[app_root] = app_data
return user_config_dict
def read_user_selector_config_json(self, user_homedir, uid, gid, euid=None):
"""
Read [python|ruby|node]-selector.json file from user's directory.
:param euid: current effective uid
:param user_homedir: user's home directory
:param uid: uid for drop rights
:param gid: gid for drop rights
:return: Cortege contents_dict. None - there is no [python|ruby|node]-selector.json file in user's directory
:raises BaseSelectorError if error. Exception contents:
{'message': "File %(file)s read/parse error: %(error)s",
'context': {'file': node_json_path, 'error': 'some message'}}
"""
# don't do this like euid=os.geteuid() in method signature!
if euid is None:
euid = os.geteuid()
json_config_path = os.path.join(user_homedir, self._USER_CONFIG)
if not os.path.exists(json_config_path):
raise AbsentFileError(json_config_path)
try:
if euid == 0:
# reads file with drop rights
# to prevent print error messages from secureio.read_file_secure directly to stdout
secureio.SILENT_FLAG = True
file_lines = secureio.read_file_secure(json_config_path, uid, gid, exit_on_error=False, write_log=False)
else:
# read file without dropping rights
file_lines = get_file_lines(json_config_path)
return self._add_absent_passenger_log_file(json.loads(''.join(file_lines)))
except (IOError, OSError, TypeError, ValueError) as e:
# [python|node|ruby]-selector.json is unreadable or have non-json format
raise BaseSelectorError({'message': "File %(file)s read/parse error: %(error)s",
'context': {'file': json_config_path, 'error': str(e)}})
def get_htaccess_by_appdir(self, user, app_dir, doc_root, app_config=None):
"""
Retrieve .htaccess for user and app
:param user: Username
:param app_dir: App dir
:param doc_root: Document root for selected domain
:param app_config: Optional app configuration
:return: .htaccess full path
"""
try:
if app_config is None:
app_config = self.get_app_config(user, app_dir)
return os.path.join(doc_root, app_config['app_uri'], '.htaccess')
except KeyError:
return None
def set_app_status(self, user, app_dir, new_status):
"""
Retrieves application status
:param user: User name
:param app_dir: Application directory
:param new_status: New appication status
:type new_status: str
:return: None
"""
user_config_data = self.get_user_config_data(user)
try:
app_config = get_using_realpath_keys(user, app_dir, user_config_data)
app_config['app_status'] = new_status
except KeyError:
return
# Write new config to file
self.write_full_user_config_data(user, user_config_data)
def write_full_user_config_data(self, user, config_data):
"""
Write data to user's config
:param user: name of unix user
:param config_data: data in json format
:return: None
"""
json_config_path = self._get_path_to_user_config(user)
if os.geteuid() == 0 or os.getegid() == 0:
raise ClSelectExcept.SelectorException(
'root should not write to user config')
dumped_data = pretty_json(config_data)
try:
secureio.write_file_via_tempfile(
content=dumped_data,
dest_path=json_config_path,
perm=0o644,
suffix='_tmp',
)
except (IOError, OSError, TypeError) as e:
raise ClSelectExcept.UnableToSaveData(
json_config_path,
'Could not write json user config ({})'.format(e))
def is_version_in_use(self, version):
"""
Returns True if specified NodeJS version is in use by any app of any user.
It will stop search on the first match.
:param version: NodeJS interpreter major version
:return True | False
"""
user_info = self.get_users_dict()
for user_name, user_pw_entry in iteritems(user_info):
try:
# Get user's [python|ruby|node]-selector.json file as dictionary
user_apps_data = self.read_user_selector_config_json(user_pw_entry.pw_dir, user_pw_entry.pw_uid,
user_pw_entry.pw_gid)
if user_apps_data is not None:
for app in user_apps_data.values(): #pylint: disable=E1101
if app.get('%s_version' % self.INTERPRETER) == version:
return True
except AbsentFileError:
# just skip this error, as it means than config file
# is not created yet or it was removed by user
pass
except BaseSelectorError as e:
self.write_string_to_log(e.message % e.context) # pylint: disable=exception-message-attribute
return False
def get_applications_users_info(self, user=None):
"""
Retrieves info about all installed NodeJS interpreters and user(s) applictions
:param user: User name for read applictions. If None and current euid == 0 - all users will be processed.
If current euid != 0 (called under some user), this argument will be ignored
and only user with uid == euid will be processed
:return: Dictionary with user(s) applications info
:raises ClSelectExcept.NoSuchUser or BaseSelectorError
"""
try:
user_info = self.get_users_dict(user)
except secureio.clpwd.NoSuchUserException:
raise ClSelectExcept.NoSuchUser(user)
users_apps_info = self._get_interpreter_info()
available_versions = users_apps_info['available_versions']
# Process all needed users
for user_name, user_pw_entry in iteritems(user_info):
try:
# Get user's [python|ruby|node]-selector.json file as dictionary
user_apps_data = self.read_user_selector_config_json(user_pw_entry.pw_dir, user_pw_entry.pw_uid,
user_pw_entry.pw_gid)
# User applications data was read successfully - process it
self._add_all_user_apps(user_pw_entry, available_versions, user_apps_data)
except AbsentFileError:
# just skip this error, as it means than config file
# is not created yet or it was removed by user
pass
except BaseSelectorError as e:
# Error retrieving data from user's [python|ruby|node]-selector.json
if os.geteuid() == 0:
# we are root - write message to log
# TODO: add errors logging of broken configs
self.write_string_to_log(e.message % e.context) # pylint: disable=exception-message-attribute
users_apps_info['warning'] = 'Some user\'s %s can\'t be read. ' \
'Some user(s) data absent in output. Please see file %s for details' % \
(self._USER_CONFIG, self._LOG_FILE_NAME)
else:
# we are user - show exception
raise e
return users_apps_info
def get_app_folders(self, username, app_root, chk_env=True, chk_app_root=True):
"""
Calculate, check exists and return application folders
This method does not check that application exists in config.
:raises: NoSuchUserException, MissingVirtualenvError, MissingAppRootError
:return: tuple(app_root, app_venv) with absolute paths
"""
user_home = self._pwd.get_pw_by_name(username).pw_dir
app_venv = os.path.join(user_home, self.VENV_DIR, app_root)
if chk_env and not os.path.exists(app_venv):
raise MissingVirtualenvError(app_venv)
app_root = os.path.join(user_home, app_root)
if chk_app_root and not os.path.exists(app_root):
raise MissingAppRootError(app_root)
return app_root, app_venv
def _get_interpreter_info(self):
"""Get initial information about interpreter"""
users_apps_info = self._manager.get_summary()
return users_apps_info
def acquire_interpreter_lock(self, it_version):
"""
Just a public proxy to internal method that blocks
any actions with interpreter
"""
return self._manager.pkg.acquire_interpreter_lock(it_version)