# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
from __future__ import absolute_import
from __future__ import print_function
from __future__ import division
import os
import pwd
from distutils.version import StrictVersion
from clselect.clselectctl import get_directory
from clselect.utils import check_call, check_output, list_dirs
from .extensions import EXTENSION_PATTERN, ExtensionInfo
from .interpreters import Interpreter
DEFAULT_PREFIX = 'rubyvenv'
RUBYVENV_BIN = os.path.join(os.path.dirname(__file__), 'rubyvenv.py')
VERSION_DELIMITER = '#'
PYTHON_PATH = '/opt/cloudlinux/venv/bin/python3'
class Environment(object):
def __init__(self, name, user=None, prefix=None):
self.name = name
if user:
self.user = user
else:
self.user = pwd.getpwuid(os.getuid()).pw_name
if prefix is None:
self.prefix = DEFAULT_PREFIX
else:
self.prefix = prefix
self.path = os.path.join(_abs_prefix(self.user, self.prefix), name)
self._interpreter = None
self._gem = None
self.interpreter_name = 'ruby' + name
def __repr__(self):
return ("%s.%s(name='%s', user='%s', prefix='%s')" % (
self.__class__.__module__, self.__class__.__name__,
self.name, self.user, self.prefix))
def _demote(self):
user_pwd = pwd.getpwnam(self.user)
def func():
os.setgid(user_pwd.pw_gid)
os.setuid(user_pwd.pw_uid)
return func
def as_dict(self, key=None):
e = {
'name': self.name,
'interpreter': self.interpreter(),
'extensions': self.extensions(),
}
if key:
del e[key]
return {getattr(self, key): e}
return e
def as_deepdict(self, key=None):
e = {
'name': self.name,
'interpreter': self.interpreter().as_dict(),
'extensions': self.extensions(),
}
if key:
del e[key]
return {getattr(self, key): e}
return e
def create(self, interpreter=None):
if not interpreter:
interpreter = Interpreter(target_user=self.user)
prompt = '(' + \
get_directory(os.path.basename(self.prefix)) + ':' + self.name + \
')'
check_call(
PYTHON_PATH, RUBYVENV_BIN,
'--prompt', prompt,
'--ruby', interpreter.binary,
self.path,
preexec_fn=self._demote())
def destroy(self):
check_call('/bin/rm', '-r', '--interactive=never', self.path, preexec_fn=self._demote())
def exists(self):
return os.path.exists(self.path)
def interpreter(self):
if not self._interpreter:
self._interpreter = Interpreter(prefix=self.path, target_user=self.user)
return self._interpreter
def gem(self):
if not self._gem:
self._gem = os.path.join(self.path, 'bin', 'gem')
return self._gem
def extension_install(self, extension):
locked_extensions = ExtensionInfo.get_locked_extensions(self.interpreter_name)
t = extension.split(VERSION_DELIMITER)
extension, version = t[0], t[1:] or ''
if StrictVersion(self.name) >= StrictVersion('2.6'):
command = (self.gem(), 'install', '--no-document', extension)
else:
command = (self.gem(), 'install', '--no-rdoc', '--no-ri', extension)
if version:
version = version[0]
command += ('-v', version)
if ExtensionInfo.is_extensions_locked(locked_extensions, extension, version):
raise ValueError("Extension '%s' install is prohibited. System extension" % extension)
check_call(args=command, preexec_fn=self._demote())
def extension_update(self, extension):
check_call(self.gem(), 'update', extension, preexec_fn=self._demote())
def extension_uninstall(self, extension):
locked_extensions = ExtensionInfo.get_locked_extensions(self.interpreter_name)
t = extension.split(VERSION_DELIMITER)
extension, version = t[0], t[1:] or ''
command = (self.gem(), 'uninstall', extension, '-x', '-a')
if version:
version = version[0]
command += ('-v', version)
if ExtensionInfo.is_extensions_locked(locked_extensions, extension, version):
raise ValueError("Extension '%s' removal is prohibited" % extension)
check_call(args=command, preexec_fn=self._demote())
def extensions(self):
result = {}
locked_extensions = ExtensionInfo.get_locked_extensions(self.interpreter_name)
output = check_output(self.gem(), 'list', '--local', preexec_fn=self._demote())
extensions = EXTENSION_PATTERN.findall(output)
docs = (ExtensionInfo.extension_doc(extension)
for extension, _ in extensions)
for (name, version), doc in zip(extensions, docs):
if ExtensionInfo.is_extensions_locked(locked_extensions, name, version):
version_diff = list(set([v.strip() for v in version.split(',')])
- set(locked_extensions.get(name)))
if version_diff and len(locked_extensions.get(name)) != 0:
result[name] = {'doc': doc, 'version': ', '.join(version_diff)}
else:
result[name] = {'doc': doc, 'version': version}
return result
def _abs_prefix(user=None, prefix=None):
if not prefix:
prefix = DEFAULT_PREFIX
if user:
return os.path.join(pwd.getpwnam(user).pw_dir, prefix)
else:
return os.path.join(pwd.getpwuid(os.getuid()).pw_dir, prefix)
def environments(user=None, prefix=None):
venv_path = _abs_prefix(user, prefix)
try:
env_list = list_dirs(venv_path)
except OSError:
return []
envs = []
for env_name in env_list:
envs.append(Environment(env_name, user, prefix))
return envs
def environments_dict(key, user=None, prefix=None):
return dict(list(e.as_dict(key=key).items()) for e in environments(user, prefix))
def environments_deepdict(key, user=None, prefix=None):
return dict(list(e.as_deepdict(key=key).items())
for e in environments(user, prefix))