# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
from collections import namedtuple
from typing import Dict, Iterator, List, Optional, Tuple, Union # NOQA
from clcommon import ClPwd
from clcommon.cpapi import NotSupported, resellers
from clveconfig import DEFAULT_PROVIDER, EMPTY_LIMITS, InheritedLimits, Limits, XmlConfigReader # NOQA
from lvectllib import DEFAULT_PACKAGE, LVE_DEFAULT, LVP_DEFAULT, panelpackagesdict, paneluserslist
Package = namedtuple('Package', ['name', 'provider', 'limits'])
Reseller = namedtuple('Reseller', ['name', 'defaults', 'limits'])
User = namedtuple('User', ['id', 'package', 'limits', 'reseller', 'is_panel_user'])
InheritedLimit = namedtuple('IneritedLimit', ['value', 'source'])
# Name of key for defaults limits
DEFAULTS = 'DEFAULTS'
# Names of source inheritance
OWN_SOURCE = 'own'
PACKAGE_SOURCE = 'package'
RESELLER_SOURCE = 'reseller'
GLOBAL_SOURCE = 'global'
BUILTIN_SOURCE = 'builtin'
class LimitsDataStorage:
"""
This class is designed to merge data from xml config & control panel
and provide user-friendy interface to work with lve limits (packages, resellers, defaults)
What this class can do:
- process information from xml config and control panel
- re-pack structures for easy data access
What this class should not do:
- set limits in kernel
- know anything about kernel limits
"""
# If we can't find Reseller with defined name,
# we return Reseller with default name (DEFAULT_PROVDER),
# and without any limits (own and defaults)
DEFAULT_RESELLER_OBJ = Reseller(
name=DEFAULT_PROVIDER,
defaults=None,
limits=None,
)
# hack for cPanel; for some reason we do not allow apply limits of 'default' package
# instead, we use 'virtual' package named VE_DEFAULT without any limits
# FIXME: LU-936
DEFAULT_PKG_OBJ = Package(
name=DEFAULT_PACKAGE,
provider=DEFAULT_RESELLER_OBJ,
limits=None,
)
def __init__(self, _xml_config_reader=None):
self._pwd = ClPwd()
self._xml_source = _xml_config_reader or XmlConfigReader()
self._panel_resellers_list = None # type: List[Optional[str]]
self._packages = None # type: Dict[Tuple[str, str], Package]
self._users = None # type: Dict[int, User]
self._resellers = None # type: Dict[str, Reseller]
self._defaults = None # type: Dict[str, Limits]
@staticmethod
def _limit_name_convert(limit_name):
# type: (str) -> str
"""
Convert mem to vmem and reversed
"""
if limit_name == 'vmem':
limit_name = 'mem'
elif limit_name == 'mem':
limit_name = 'vmem'
return limit_name
def _get_panel_resellers(self):
# type: () -> List[Optional[str]]
"""
Get list of names of panel resellers.
If cpapi panel plugin don't support that method,
we return empty list
"""
if self._panel_resellers_list is None:
try:
self._panel_resellers_list = resellers()
except NotSupported:
self._panel_resellers_list = []
return self._panel_resellers_list
@staticmethod
def _is_active_reseller(reseller):
# type: (Reseller) -> bool
"""
Not activated reseller doesn't have any limits
"""
return reseller.defaults is not None
def _list_uids_from_ve_cfg(self):
# type: () -> Iterator[int]
for uid in self._xml_source.users_lve_ids():
yield uid
def get_user_by_uid(self, uid):
# type: (int) -> User
"""
This method will return User by uid.
If we will pass uid of nonexistent user,
method return User without any limits,
with with DEFAULT_RESELLER_OBJ and
with DEFAULT_PKG_OBJ.
This trick is needed for validating of limits
for nonexistent user in case of migrating
limits and entities from another server.
"""
if uid in self.users:
return self.users[uid]
return User(
id=uid,
package=self.DEFAULT_PKG_OBJ,
limits=self._xml_source.get_user_limits(uid),
reseller=self.DEFAULT_RESELLER_OBJ,
is_panel_user=False,
)
def get_package_by_name_and_reseller(self, name, reseller):
# type: (str, str) -> Package
"""
This method is needed because of lvectl bug "by design"
our api designed only for situations when user's owner is
the owner of the package that contains that user
but sometimes user can be in admin's package.
We will return entity Package without
any limits and with DEFAULT_RESELLER_OBJ if
we request name of nonexistent package.
This trick is needed for validating of limits
for nonexistent package in case of migrating
limits and entities from another server
"""
if (reseller, name) in self.packages:
return self.packages[(reseller, name)]
if (DEFAULT_PROVIDER, name) in self.packages:
return self.packages[DEFAULT_PROVIDER, name]
return Package(
name=name,
provider=self.DEFAULT_RESELLER_OBJ,
limits=self._xml_source.get_package_limits(
name,
DEFAULT_PROVIDER,
)
)
def get_reseller_by_name(self, name):
# type: (str) -> Reseller
if name in self.resellers:
return self.resellers[name]
return self.DEFAULT_RESELLER_OBJ
@property
def defaults(self):
# type: () -> Dict[str, Limits]
if self._defaults is None:
self._defaults = {}
for name in self._get_panel_resellers():
self._defaults[name] = self._xml_source.get_reseller_default_limits(name)
self._defaults[DEFAULTS] = self._xml_source.defaults()
return self._defaults
@property
def packages(self):
# type: () -> Dict[Tuple[str, str], Optional[Package]]
if self._packages is None:
self._packages = {}
for provider, provider_packages in panelpackagesdict().items():
for package_name in provider_packages:
self._packages[(provider, package_name)] = Package(
name=package_name,
provider=self.get_reseller_by_name(provider),
limits=self._xml_source.get_package_limits(
package_name,
provider,
)
)
return self._packages
@property
def users(self):
# type: () -> Dict[int, User]
if self._users is None:
self._users = {}
for uid, reseller, package in paneluserslist():
self._users[uid] = User(
id=uid,
reseller=self.get_reseller_by_name(reseller),
package=self.get_package_by_name_and_reseller(package, reseller),
limits=self._xml_source.get_user_limits(uid),
is_panel_user=True,
)
for uid in self._list_uids_from_ve_cfg():
if uid not in self._users:
self._users[uid] = User(
id=uid,
package=self.DEFAULT_PKG_OBJ,
limits=self._xml_source.get_user_limits(uid),
reseller=self.DEFAULT_RESELLER_OBJ,
is_panel_user=False,
)
return self._users
@property
def resellers(self):
# type: () -> Dict[str, Reseller]
if self._resellers is None:
self._resellers = {}
for reseller_name in self._get_panel_resellers():
self._resellers[reseller_name] = Reseller(
name=reseller_name,
defaults=self._xml_source.get_reseller_default_limits(reseller_name),
limits=self._xml_source.get_reseller_limits(reseller_name)
)
return self._resellers
def get_packages_with_custom_limits(self):
# type: () -> Dict[Tuple[str, str], Package]
return {
key: package for key, package in self.packages.items()
if package.limits is not None and package.limits != EMPTY_LIMITS
}
def get_users_with_custom_limits(self):
# type: () -> Dict[int, User]
return {
uid: user for uid, user in self.users.items()
if user.limits is not None and user.limits != EMPTY_LIMITS and user.is_panel_user
}
# TODO: Come up with abstract mechanism of inheritance
def get_package_limits(self, package):
# type: (Package) -> InheritedLimits
"""
Method will return inherited limits for package
"""
reseller_defaults = package.provider.defaults # type: Limits
global_defaults = self.defaults[DEFAULTS] # type: Limits
package_own_limits = (package.limits or EMPTY_LIMITS)._asdict()
for key, value in package_own_limits.items():
if value is None:
reseller_default_value = getattr(reseller_defaults, key, None)
global_default_value = getattr(global_defaults, key, None)
builtin_default_value = LVE_DEFAULT[self._limit_name_convert(key)]
if reseller_default_value is not None:
package_own_limits[key] = InheritedLimit(
value=reseller_default_value,
source=RESELLER_SOURCE
)
# Package's provider doesn't have Limits if it's deactivated
# and package is inherited limits from global defaults.
# Package isn't inherited from global defaults if
# it's provider (reseller) is activated.
elif global_default_value is not None and not self._is_active_reseller(package.provider):
package_own_limits[key] = InheritedLimit(
value=global_default_value,
source=GLOBAL_SOURCE
)
else:
package_own_limits[key] = InheritedLimit(
value=builtin_default_value,
source=BUILTIN_SOURCE
)
else:
package_own_limits[key] = InheritedLimit(
value=value,
source=OWN_SOURCE
)
return InheritedLimits(**package_own_limits)
def get_user_limits(self, user):
# type: (User) -> InheritedLimits
"""
Method will return inherited limits for user
"""
global_defaults = self.defaults[DEFAULTS] # type: Limits
reseller_defaults = user.reseller.defaults # type: Limits
package_limits = user.package.limits # type: Limits
user_limits = (user.limits or EMPTY_LIMITS)._asdict()
for key, value in user_limits.items():
if value is None:
package_value = getattr(package_limits, key, None)
reseller_default_value = getattr(reseller_defaults, key, None)
global_default_value = getattr(global_defaults, key, None)
builtin_default_value = LVE_DEFAULT[self._limit_name_convert(key)]
if package_value is not None:
user_limits[key] = InheritedLimit(
value=package_value,
source=PACKAGE_SOURCE
)
elif reseller_default_value is not None:
user_limits[key] = InheritedLimit(
value=reseller_default_value,
source=RESELLER_SOURCE
)
# User's reseller doesn't have Limits if it's deactivated
# and user is inherited limits from global defaults.
# User isn't inherited from global defaults if
# it's reseller is activated.
elif global_default_value is not None and not self._is_active_reseller(user.reseller):
user_limits[key] = InheritedLimit(
value=global_default_value,
source=GLOBAL_SOURCE
)
else:
user_limits[key] = InheritedLimit(
value=builtin_default_value,
source=BUILTIN_SOURCE
)
else:
user_limits[key] = InheritedLimit(
value=value,
source=OWN_SOURCE
)
return InheritedLimits(**user_limits)
def get_reseller_limits(self, reseller):
# type: (Reseller) -> InheritedLimits
"""
Method will return limits equal builtin for non-activated reseller
and inherited limits for activated reseller
"""
# We return limits equal builtin for non-activated reseller
# while activation of reseller.
# It for corner case when we check input limits at the moment of activation reseller
if not self._is_active_reseller(reseller):
return InheritedLimits(
cpu=InheritedLimit(
value=LVP_DEFAULT['cpu'],
source=BUILTIN_SOURCE,
),
ncpu=InheritedLimit(
value=LVP_DEFAULT['ncpu'],
source=BUILTIN_SOURCE,
),
io=InheritedLimit(
value=LVP_DEFAULT['io'],
source=BUILTIN_SOURCE,
),
vmem=InheritedLimit(
value=LVP_DEFAULT['mem'],
source=BUILTIN_SOURCE,
),
pmem=InheritedLimit(
value=LVP_DEFAULT['pmem'],
source=BUILTIN_SOURCE,
),
nproc=InheritedLimit(
value=LVP_DEFAULT['nproc'],
source=BUILTIN_SOURCE,
),
ep=InheritedLimit(
value=LVP_DEFAULT['ep'],
source=BUILTIN_SOURCE,
),
iops=InheritedLimit(
value=LVP_DEFAULT['iops'],
source=BUILTIN_SOURCE,
)
)
reseller_limits = reseller.limits._asdict()
for key, value in reseller_limits.items():
if value is None:
builtin_default_value = LVP_DEFAULT[self._limit_name_convert(key)]
reseller_limits[key] = InheritedLimit(
value=builtin_default_value,
source=BUILTIN_SOURCE
)
else:
reseller_limits[key] = InheritedLimit(
value=value,
source=OWN_SOURCE
)
return InheritedLimits(**reseller_limits)
def get_defaults_limits(self, reseller):
# type: (Union[Reseller, DEFAULTS]) -> InheritedLimits
"""
Method will return empty inherited defaults limits for non-activated reseller
or inherited defaults limits for activated reseller.
For global defaults method will return inherited limits
"""
global_defaults = self.defaults[DEFAULTS] # type: Limits
if reseller == DEFAULTS:
reseller_defaults = global_defaults._asdict()
else:
# We return empty inherited limits for non-activated reseller,
# because it has no any limits.
if not self._is_active_reseller(reseller):
return InheritedLimits(**EMPTY_LIMITS._asdict())
reseller_defaults = reseller.defaults._asdict()
for key, value in reseller_defaults.items():
if value is None:
builtin_default_value = LVE_DEFAULT[self._limit_name_convert(key)]
reseller_defaults[key] = InheritedLimit(
value=builtin_default_value,
source=BUILTIN_SOURCE
)
else:
reseller_defaults[key] = InheritedLimit(
value=value,
source=OWN_SOURCE
)
return InheritedLimits(**reseller_defaults)