# coding:utf-8
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2022 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
from typing import Tuple, Union
from docopt import DocoptExit, docopt
from schema import Schema, And, Use, Or, SchemaError
from clcommon.const import Feature
from clcommon.cpapi import is_panel_feature_supported
from .utils import VALUES_STR
PROG_NAME = "cloudlinux-limits"
AVAILABLE_LVE_KEYS = {"speed", "nproc", "pmem", "vmem", "maxEntryProcs", "io", "iops"}
AVAILABLE_LVP_KEYS = {"speed", "nproc", "pmem", "maxEntryProcs", "io", "iops"}
AVAILABLE_QUOTA_KEYS = {"inodes"}
AVAILABLE_MYSQL_KEYS = {"mysql-cpu", "mysql-io"}
AVAILABLE_KEYS = AVAILABLE_LVE_KEYS | AVAILABLE_QUOTA_KEYS | AVAILABLE_MYSQL_KEYS | AVAILABLE_LVP_KEYS
AVAILABLE_DEFAULTS = AVAILABLE_KEYS | {"all"}
if is_panel_feature_supported(Feature.LVE):
AVAILABLE_LIMITS = AVAILABLE_KEYS | {"mysql-gov", "mysql-restrict", "cagefs"}
else:
AVAILABLE_LIMITS = AVAILABLE_QUOTA_KEYS | {"cagefs"}
AVAILABLE_MYSQL_KEYS_ALL = AVAILABLE_MYSQL_KEYS | {"mysql-restrict", "mysql-unrestrict-all", "mysql-gov"}
ADMIN_ONLY_OPTIONS = ["--mysql-restrict", "--mysql-unrestrict-all", "--mysql-gov", "--mysql-io",
"--mysql-cpu", "--cagefs", "--inodes"]
def _default_keys_validate(keys):
"""
Validate limits keys for --default
"""
return len(set(keys) - AVAILABLE_DEFAULTS) == 0
def _limits_keys_validate(keys):
"""
Validate limits keys
"""
return len(set(keys.split(",")) - set(AVAILABLE_LIMITS)) == 0
# uncomment this for add full edit mode
# gov_int_list = And(Use(lambda x:x.split(",")), lambda x:len(x) == 4,
# Use(lambda x:map(int, x)))
global_schema = Schema({
"set": bool,
"get": bool,
"disable-reseller-limits": bool,
"enable-reseller-limits": bool,
"--json": And(bool, lambda x: x, error="use --json option, other modes currently unsupported"),
"--human-readable-numbers": bool,
"--unlimited": bool,
"--default": Or(None, And(Use(lambda x: x.split(',')), _default_keys_validate), error="Invalid keys"),
"--save-all-parameters": bool,
"--lve-id": Or(None, And(Use(int), lambda x: int(x) >= 0),
And(Use(lambda x: str(x).lower()),
lambda x: x == "default"),
error="--lve-id must be non-negative integer value or 'default'"),
"--reseller-id": Or(None, And(Use(int), lambda x: int(x) >= 0),
error="--reseller-id must be non-negative integer value"),
"--reseller-name": Or(None, And(str, lambda x: not x.isdigit()), error="Invalid reseller name"),
"--all": bool,
"--for-reseller": Or(None, And(str, lambda x: not x.isdigit()), error="Invalid reseller name"),
"--username": Or(None, str),
"--domain": Or(None, str),
"--limits": Or(None, _limits_keys_validate, error="Invalid keys"),
"--mysql-gov": Or(None, lambda x: x in ["ignored", "watched"],
error=f"{VALUES_STR}: 'ignored', 'watched'"),
"--cagefs": Or(None, lambda x: x in ["enabled", "disabled"],
error=f"{VALUES_STR}: 'enabled', 'disabled'"),
"--mysql-restrict": Or(None, lambda x: x in ["restricted", "unrestricted"],
error=f"{VALUES_STR}: 'restricted', 'unrestricted'"),
"--mysql-unrestrict-all": bool,
"--speed": Or(None, str),
"--pmem": Or(None, str),
"--vmem": Or(None, str),
"--nproc": Or(None, str),
"--io": Or(None, str),
"--iops": Or(None, str),
"--maxEntryProcs": Or(None, str),
"--mysql-cpu": Or(None, And(Use(int), lambda x: x >= 0),
error="--mysql-cpu must be non-negative integer value"),
# uncomment this for add full edit mode
# "--mysql-io": Or(None, And(Use(int), lambda x:x >= 0), gov_int_list,
"--mysql-io": Or(None, And(Use(int), lambda x: x >= 0),
error="--mysql-io must be non-negative integer value"),
# uncomment this for add full edit mode
# "--mysql-read": Or(None, And(Use(int), lambda x:x >= 0), gov_int_list,
# error="--mysql-read must be non-negative integer value or 4 int numbers with comma"),
# "--mysql-write": Or(None, And(Use(int), lambda x:x >= 0), gov_int_list,
# error="--mysql-write must be non-negative integer value or 4 int numbers with comma"),
"--inodes": Or(None, str),
"--help": bool,
})
USAGE_CMDS = (
"set [--json] (--lve-id <int> | --username <str>) (--unlimited) [--for-reseller <str>]",
"set [--json] (--lve-id <int> | --username <str> | --reseller-name <str>) (--default <str>)",
"set [--json] (--lve-id <int> | --username <str>) "
"(--mysql-gov <ignored,watched> | --cagefs <enabled,disabled> | --mysql-restrict <restricted,unrestricted>)",
"set [--json] (--mysql-unrestrict-all)",
"set [--json] (--lve-id <int> | --username <str>) "
"[--speed <str> --io <str> --nproc <str> --pmem <str> --vmem <str> --iops <str> --inodes <N,M> "
"--maxEntryProcs <str> --mysql-cpu <int> --mysql-io <int> --save-all-parameters]",
"set [--json] (--lve-id <int> | --username <str> | --reseller-name <str>) "
"[--speed <str> --io <str> --nproc <str> --pmem <str> --vmem <str> --iops <str> --maxEntryProcs <str>] "
"[--default <str>] [--for-reseller <str>]",
"set [--json] (--reseller-name <str>) (--unlimited)",
"[get] [--json] [--lve-id <int> | --username <str> | --reseller-name <str> | --domain <str>] "
"[--limits=<keys>] [--human-readable-numbers] [--for-reseller <str>]",
"disable-reseller-limits (--reseller-id <int> | --reseller-name <str>) [--json]",
"enable-reseller-limits ((--reseller-id <int> | --reseller-name <str>) | --all) [--json]",
"(-h | --help)",
)
USAGE = "\n ".join(f"{PROG_NAME} {cmd}" for cmd in USAGE_CMDS).strip()
class CloudlinuxLimitsOptsParser:
"""
Parse arguments for cloudlinux-limits command
:param argv: sys.argv
:param _is_json_need: sys.argv contains --json key
:return cortege: (error_flag, s_message)
"""
docstring = f"""Utility to get/set any Cloudlinux limits
Usage:
{USAGE}
Options:
--json Return data in JSON format.
--lve-id <int> LVE id. will display record only for that LVE id.
--username <str> Execute command only for specific user.
--reseller-name <str> Execute command only for specific reseller.
--reseller-id <int> Execute command only for specific reseller.
--all Execute command for all resellers
--for-reseller <str> Use supplied reseller for get/set data.
--domain <str> Show data only for specific domain
--limits <keys> Available keys: speed,nproc,pmem,vmem,maxEntryProcs,io,
iops,mysql-gov,mysql-cpu,mysql-io,cagefs,inodes
--human-readable-numbers Return PMEM and VMEM limits in KBytes, MBytes or GBytes
--unlimited Set all limits to unlimited.
--default [limits] Reset limits to the Package defaults.
List of comma-separated limits to reset them to default or "all"
--mysql-gov <ignored|watched> Monitor or ignore by MySQL governor.
--cagefs <enabled|disabled> Enable or disable CageFS for a user.
--mysql-restrict <[un]restricted> Set user restrict status with dbctl (restricted or unrestricted).
--mysql-unrestrict-all Unrestrict all restricted users with dbctl.
--speed <str> Limit CPU usage for LVE | LVP.
--pmem <str> Limit physical memory usage for applications inside LVE | LVP.
--vmem <str> Limit virtual memory for applications inside LVE.
--nproc <str> Limit number of processes for LVE | LVP.
--io <str> Define io limits for LVE | LVP (KB/s).
--iops <str> Limit io per second for LVE | LVP.
--maxEntryProcs <str> Limit number of entry processes for LVE | LVP.
--mysql-cpu <int> Set MySQL governor CPU limit (pct).
--mysql-io <int> Set MySQL governor IO limit (read + write MB/s)
--inodes <N,M> Set inode limits. N - soft, M - hard.
--save-all-parameters Save all parameters even if they match with defaults settings.
-h, --help Show this help message and exit
"""
# add this parameters to docstring when need add read/write/cpu full edit mode
# this line for Usage block
# {0} set [--json] (--lve-id <int> | --username <str>) \
# [--speed <int> --io <int> --nproc <int> --pmem <int> --vmem <int> --iops <int> --inodes <N,M> \
# --maxEntryProcs <int> --mysql-cpu <int:int,int,int,int> --mysql-read <int:int,int,int,int> \
# --mysql-write <int:int,int,int,int> --mysql-io <int:int,int,int,int> --save-all-parameters]
# this lines for Options block
# --mysql-cpu <int:int,int,int,int> Set MySQL governor CPU limit (pct).
# --mysql-io <int:int,int,int,int> Set MySQL governor IO limit (read + write KB/s)
# --mysql-read <int:int,int,int,int> Set MySQL governor read limit (KB/s)
# --mysql-write <int:int,int,int,int> Set MySQL governor write limit (KB/s)
commands = ("set", "get", "disable-reseller-limits", "enable-reseller-limits")
@staticmethod
def _is_option_compatible_with_admins(option: str, argv: list) -> Tuple[bool, str]:
"""
Checks options compatibility with admin options
:return: Cortege (flag, message)
flag: True/False - compatible/not compatible
message - comment if flag is False
"""
option_found = False
admin_option_name = ""
for arg in argv:
if arg.startswith(option):
option_found = True
continue
if not admin_option_name:
for admin_option in ADMIN_ONLY_OPTIONS:
if arg.startswith(admin_option):
admin_option_name = admin_option
if option_found and admin_option_name:
return False, f"ERROR: option '{admin_option_name}' is not compatible with '--for-reseller'"
return True, ""
def _perform_checks_before_docopt(self, argv: list) -> Tuple[bool, str]:
"""
Performs checks before parsing arguments
"""
is_compatible, message = self._is_option_compatible_with_admins("--for-reseller", argv)
return is_compatible, message
def _perform_checks_after_docopt(self, args: dict) -> Tuple[bool, str]:
"""
Performs checks after parsing arguments
"""
# cloudlinux-limits set/get --json --reseller-name=reseller --for-reseller=reseller -- ERROR
if args["--reseller-name"] and args["--for-reseller"]:
return False, "ERROR: '--reseller-name' and '--for-reseller' cannot be used together"
return True, ""
def _apply_docopt(self, argv: list, is_json: bool) -> Tuple[dict, str]:
"""
Applies docopt to parse utility arguments.
Performs checks before and after parsing
"""
ok, message = self._perform_checks_before_docopt(argv)
if not ok:
return {}, message
try:
args = docopt(self.docstring, argv)
except DocoptExit:
s_error_string = "ERROR: Invalid parameter passed or required parameter is missing"
if not is_json:
s_error_string += "\n\n" + self.docstring
return {}, s_error_string
ok, message = self._perform_checks_after_docopt(args)
if not ok:
return {}, message
return args, ""
@staticmethod
def _get_schema() -> Schema:
"""
Returns schema that should be used to validate parsed arguments
"""
return global_schema
@staticmethod
def _validate_schema(schema: Schema, args: dict) -> Tuple[dict, str]:
"""
Validate parsed arguments with provided schema
"""
try:
args = schema.validate(args)
except SchemaError as e:
return {}, str(e)
return args, ""
def parse_args(self, argv: list, is_json: bool = False) -> Tuple[bool, Union[str, dict]]:
"""
Parses and validates arguments passed to the utility
"""
args, message = self._apply_docopt(argv, is_json)
if not args:
return False, message
# if command was not specified, use `get` command
if not any(args.get(command) for command in self.commands):
args["get"] = True
schema = self._get_schema()
args, message = self._validate_schema(schema, args)
if not args:
return False, message
return True, args
class CloudlinuxLimitsNoLveOptsParser(CloudlinuxLimitsOptsParser):
docstring = f"""Utility to get/set any Cloudlinux limits
Usage:
{PROG_NAME} set [--json] (--lve-id <int> | --username <str>) (--cagefs <enabled,disabled>)
{PROG_NAME} set [--json] (--lve-id <int> | --username <str>) (--unlimited)
{PROG_NAME} set [--json] (--lve-id <int> | --username <str>) (--default <str>)
{PROG_NAME} set [--json] (--lve-id <int> | --username <str>) [--inodes <N,M> --save-all-parameters]
{PROG_NAME} [get] [--json] [--lve-id <int> | --username <str> | --domain <str>] [--limits=<keys>]
{PROG_NAME} (-h | --help)
Options:
--json Return data in JSON format.
--lve-id <int> LVE id. will display record only for that LVE id.
--username <str> Execute command only for specific user.
--cagefs <enabled|disabled> Enable or disable CageFS for a user.
--unlimited Set supported limits to unlimited.
--default [limits] Reset supported limits to the Package defaults.
List of comma-separated limits to reset them to default or "all"
--domain <str> Show data only for specific domain
--limits <keys> Available keys: inodes
--inodes <N,M> Set inode limits. N - soft, M - hard.
--save-all-parameters Save all parameters even if they match with defaults settings.
-h, --help Show this help message and exit
"""
commands = ("set", "get")
def _perform_checks_before_docopt(self, argv: list) -> Tuple[bool, str]:
# no checks to be performed
return True, ""
def _perform_checks_after_docopt(self, args: dict) -> Tuple[bool, str]:
# no checks to be performed
return True, ""
@staticmethod
def _get_schema() -> Schema:
# limit schema to the given options only, other options are not needed
schema_options = {"set", "get", "--json", "--lve-id", "--username", "--cagefs", "--domain",
"--unlimited", "--default", "--limits", "--inodes", "--save-all-parameters", "--help"}
options = {
option: value for option, value in global_schema.schema.items()
if option in schema_options
}
return Schema(options)
def parse_args(self, argv: list, is_json: bool = False) -> Tuple[bool, Union[str, dict]]:
ok, args = super().parse_args(argv, is_json)
if not ok:
return False, args
# fill parsed arguments dictionary with params from global schema
# using negative values (False or None)
# do this because these parametes can be checked in the utility
for key, value in global_schema.schema.items():
args.setdefault(key, False if value is bool else None)
return True, args