#!/opt/cloudlinux/venv/bin/python3 -bb
# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
import configparser as ConfigParser
import csv
import fcntl
import os
import pwd
import re
import tempfile
from collections import defaultdict
from stat import S_IRGRP, S_IROTH, S_IRUSR, S_IWUSR, ST_DEV
from typing import Dict, List, Optional, Tuple
import clcontrollib
import cldetectlib
from clcommon import FormattedException
# pylint: enable=E0611
from clcommon.clpwd import ClPwd
from clcommon.clquota import check_quota_enabled
from clcommon.cpapi import admin_packages, list_users, resellers_packages
from clcommon.cpapi.cpapiexceptions import CPAPIExternalProgramFailed, EncodingError
from clcommon.utils import (
ExternalProgramFailed,
get_file_lines,
get_filesystem_type,
run_command,
write_file_lines,
)
IS_DA = clcontrollib.detect.is_da()
DEFAULT_PACKAGE = 'default'
VE_DEFAULT_PACKAGE = 'VE_DEFAULT' # virtual package, alias for uid=0
class NoSuchPackageException(Exception):
def __init__(self, package):
Exception.__init__(self, "No such package (%s)" % (package,))
class NoSuchUserException(Exception):
def __init__(self, user):
Exception.__init__(self, "No such user (%s)" % (user,))
class InsufficientPrivilegesException(Exception):
def __init__(self):
Exception.__init__(self, "Insufficient privileges")
class IncorrectLimitFormatException(Exception):
def __init__(self, limit):
Exception.__init__(self, "Incorrect limit format (%s)" %(limit,))
class MalformedConfigException(FormattedException):
"""
Raised when config files is malformed and
cl-quota is not able to work with it
"""
def __init__(self, error: ConfigParser.ParsingError):
super(MalformedConfigException, self).__init__({
'message':
"cl-quota can't work because for malformed config. "
"Please, contact CloudLinux support if you "
"need help with resolving this issue. "
"Details: %(error_message)s",
'context': dict(
error_message=str(error)
)
})
class GeneralException(Exception):
def __init__(self, message):
Exception.__init__(self, message)
class QuotaDisabledException(Exception):
def __init__(self):
super(QuotaDisabledException, self).__init__('Quota disabled for all users on server')
class UserQuotaDisabledException(QuotaDisabledException):
"""
Raised when quota is disabled for one particular user
"""
def __init__(self, uid=None, homedir=None, message=None):
all_msg = 'Quota disabled'
if uid:
all_msg += ' for user id %s' % uid
if homedir:
all_msg += ' (home directory %s)' % homedir
if message:
all_msg += '; %s' % message
Exception.__init__(self, all_msg)
def _is_sys_path(path):
"""
>>> _is_sys_path('/home/username')
False
>>> _is_sys_path('/var/davecot')
True
"""
if path[-1] != '/':
path += '/'
sys_path_ = ('/root/', '/usr/', '/var/', '/sbin/', '/dev/', '/bin/', '/srv/', '/sys/', '/etc/ntp/')
if path == '/':
return True
for path_ in sys_path_:
if path.startswith(path_):
return True
def _get_users_list():
"""
Return no system users uid list
"""
cl_pwd = ClPwd()
pw_dict = cl_pwd.get_user_dict()
users_uid = [pw_dict[usr].pw_uid for usr in pw_dict if not _is_sys_path(pw_dict[usr].pw_dir)]
return users_uid
def is_quota_inheritance_enabled() -> bool:
"""
Check `cl_quota_inodes_inheritance` parameter in the config file
"""
res = cldetectlib.get_boolean_param(cldetectlib.CL_CONFIG_FILE, 'cl_quota_inodes_inheritance', default_val=False)
return res
class QuotaWrapper(object):
"""
Base quota class for inode quotas handling
* Update system quotas via setquota
* Retrieves system quotas via repquota
* Stores current quotas in /etc/container/cl-quotas.dat file
* Maintaines /etc/container/cl-quotas.cache file with resolved quotas. That file can be read by non-privileged users
"""
PROC_MOUNTS = '/proc/mounts'
QUOTASYNC = '/usr/bin/quotasync'
SETQUOTA = '/usr/sbin/setquota'
REPQUOTA = '/usr/sbin/repquota'
GETPACKS = '/usr/bin/getcontrolpaneluserspackages'
DATAFILE = '/etc/container/cl-quotas.dat'
CACHEFILE = '/etc/container/cl-quotas.cache'
# File lock variables
LOCK_FD = None
LOCK_FILE = DATAFILE + '.lock'
LOCK_WRITE = False
def __init__(self):
self._assert_file_exists(QuotaWrapper.PROC_MOUNTS)
self._assert_file_exists(QuotaWrapper.REPQUOTA)
self._assert_file_exists(QuotaWrapper.SETQUOTA)
self._quota_enabled_list = list()
self._panel_present = None
self._grace = {}
self._quota = {}
self._device_quota = {}
self._package_to_uids_map = {} # type: Dict[str, List[str]]
self._uid_to_packages_map = {} # type: Dict[str, List[str]]
self._uid_to_homedir_map = {} # type: Dict[str, str]
self._dh = self._get_saved_data_handler()
self._fields = ['bytes_used', 'bytes_soft', 'bytes_hard', 'inodes_used', 'inodes_soft', 'inodes_hard']
self._euid = os.geteuid()
self._devices = self._load_quota_devices()
self._mountpoint_device_mapped = self._get_mountpoint_device_map(self._devices)
self._device_user_map = None
# List of all packages (all admin's packages + all reseller packages)
self._all_package_list = None
@staticmethod
def _assert_file_exists(path):
"""
Checks if command is present and exits if no
"""
if not os.path.exists(path):
raise RuntimeError('No such command (%s)' % (path,))
def __enter__(self):
return self
def __exit__(self, _type, _value, _traceback):
if self.LOCK_FD is not None:
self.LOCK_FD.close()
def get_user_limits(self, uid):
'''
Returns user limits converted to tuples
'''
return self._convert_data_to_tuples(self._get_current_quotas(uid))
def get_all_users_limits(self):
'''
Returns all user limits converted to tuples
'''
return self._convert_data_to_tuples(self._get_current_quotas())
def get_package_limits(self, package):
"""
:param packname: Package name for get limits. If None, returns all packages,
else - only supplied package
Returns package limits converted to tuples (called only from main)
"""
return self._convert_data_to_tuples(self._get_package_quotas(packname=package))
def get_all_packages_limits(self, package=None):
"""
Returns all packages limits converted to tuples (called only from main)
"""
return self._convert_data_to_tuples(self._get_package_quotas(packname=package, all_packages=True))
def _preprocess_limit(self, limit):
"""
Preprocessed passed limit: 'default' --> '0', 'unlimited' --> -1, else calls _check_limit
:param limit:
:return:
"""
if limit == 'default':
return '0'
if limit in ('unlimited', '-1'):
return '-1'
return self._check_limit(limit)
def _get_package_from_dh(self, package):
return self._dh.get('packages', package).split(':')
def _get_all_packages_with_limits(self, clean_dead_packages=False):
"""
Retrive all available packages with their limits
:param clean_dead_packages: if True - remove all nonexistent packages from cl-quotas.dat
:return: Dictionary: { 'package_name': (soft_limit, hard_limit) }
"""
# result dictionary
package_limits_dict = {}
# Load packages limits from cl-quota.dat
db_packages = {}
if self._dh.has_section('packages') and len(self._dh.items('packages')) > 0:
list_of_packages = self._get_all_package_list()
for package in self._dh.options('packages'):
if clean_dead_packages and package not in list_of_packages:
self._dh.remove_option('packages', package)
continue
package_limits = self._get_package_from_dh(package)
# Pass package, if limits not well-formed
if len(package_limits) != 2:
continue
db_packages[package] = package_limits[0], package_limits[1]
if clean_dead_packages:
self._write_data()
# Put all panel packages to result dictionary
self._get_package_to_users_map()
for package in self._package_to_uids_map.keys():
if package in db_packages:
# if package present in cl-quota.dat, take limits
package_limits_dict[package] = db_packages[package]
else:
package_limits_dict[package] = ('0', '0')
return package_limits_dict
def set_user_limit(
self,
uid: str,
soft: Optional[str] = None,
hard: Optional[str] = None,
save: bool = True,
force_save: bool = False,
only_store: bool = False,
):
"""
Set limits for users
* Resolve limits according to those saved in cl-quota.dat
Limits are resolved in the following order:
user limits --> package limits --> root user (uid=0) limits
* Apply new limits when resolved limits differ from those in cache or when `force_save` is True
* Write updated values to cl-quota.dat if `save` is True
Always update cl-quota.dat in case both zeroes or both unlimited are provided
:param uid: user id
:param soft: soft limit value
:param hard: hard limit value
:param save: save limits to cl-quota.dat
:param force_save: save limits to cl-quota.dat even if they are not changed
:param only_store: store limits in memory, but do not apply them
:return: None
"""
self._check_admin()
# Validate limits value, convert 'unlimited' --> '-1', 'default' --> '0'
soft_validated, hard_validated = self._preprocess_limit(soft), self._preprocess_limit(hard)
# Derive limits from cl-quota.dat according to the limits inheritance rules
soft_resolved, hard_resolved = self._combine_user_limits(uid=uid, soft=soft_validated, hard=hard_validated)
# Convert limit to format acceptable by setquota utility
soft_converted, hard_converted = self._convert_for_sys_utility(soft=soft_resolved, hard=hard_resolved)
# Get data from repquota utility
cached = self._get_current_quotas(uid)[uid]
# Run cmd only if quota changed or force_save is True
# If force_save is True it equals to --save-all-paramters in cloudlinux-limits
if (soft_converted, hard_converted) != (cached["inodes_soft"], cached["inodes_hard"]) or force_save:
# Don't apply limits to root user
if uid != '0':
device = self._get_home_device(self._fetch_homedir(uid))
if only_store:
stdin = f'{uid} {cached["bytes_soft"]} {cached["bytes_hard"]} {soft_converted} {hard_converted}\n'
self._device_quota[device] = self._device_quota.get(device, '') + stdin
else:
cmd = [
QuotaWrapper.SETQUOTA,
'-u', uid, cached['bytes_soft'], cached['bytes_hard'], soft_converted, hard_converted, device,
]
run_command(cmd)
self._sync_quota_files(device)
if save:
soft_user_dat, hard_user_dat = self._get_user_limits_to_save(
uid, soft_validated, hard_validated, force_save=force_save)
self._save_user_limits(uid, soft_user_dat, hard_user_dat)
# Always update cl-quota.dat in case both zeroes or both unlimited are provided
if (soft_validated == '0' and hard_validated == '0') or (soft_validated == '-1' and hard_validated == '-1'):
self._save_user_limits(uid, soft_validated, hard_validated)
if uid == '0':
self._apply_all_limits()
def set_package_limit(
self,
package: str,
soft: Optional[str] = None,
hard: Optional[str] = None,
save: bool = True,
only_store: bool = False
) -> None:
"""
Sets limits for package
:param package: package name
:param soft: soft limit value
:param hard: hard limit value
:param save: save limits to cl-quota.dat
:param only_store: store limits in memory, but do not apply them
:return: None
"""
self._check_admin()
# Validate limits value, convert 'unlimited' --> '-1', 'default' --> '0'
soft_validated, hard_validated = self._preprocess_limit(soft), self._preprocess_limit(hard)
# Set limits for empty reseller package
if save is True \
and package in self._get_package_quotas(all_packages=True) \
and package not in self._get_package_to_users_map():
# Drive limits from cl-quota.dat according to the limits inheritance rules
soft_resolved, hard_resolved = self._get_saved_package_limits_if_none(
package, soft_validated, hard_validated)
self._save_package_limits(package, soft_resolved, hard_resolved)
return
if not self._check_package_exists(package):
return
# Example: {'/dev/sda1': ['502', '504', '515', '521', '501']}
device_user_map = self._get_device_user_map()
cached_quotas = self._get_current_quotas()
for device in device_user_map.keys():
std_in = []
for uid in self._get_package_to_users_map(package):
if uid not in device_user_map[device]:
continue
soft_resolved, hard_resolved = self._combine_package_limits(
uid=uid, package=package, soft=soft_validated, hard=hard_validated)
soft_converted, hard_converted = self._convert_for_sys_utility(soft=soft_resolved, hard=hard_resolved)
try:
soft_cached, hard_cached = cached_quotas[uid]['inodes_soft'], cached_quotas[uid]['inodes_hard']
if (soft_converted, hard_converted) != (soft_cached, hard_cached):
std_in.append(
f'{uid} '
f'{cached_quotas[uid]["bytes_soft"]} {cached_quotas[uid]["bytes_hard"]} '
f'{soft_converted} {hard_converted}'
)
except KeyError:
pass # skip error when quota is on but not configured
if len(std_in) != 0:
std_in = ('\n'.join(std_in) + '\n')
self._device_quota[device] = self._device_quota.get(device, '') + std_in
if save:
soft_data, hard_data = self._get_package_limits_to_save(package, soft_validated, hard_validated)
self._save_package_limits(package, soft_data, hard_data)
if not only_store:
self._flush_device_quota()
def synchronize(self):
"""
Read limits from file and applies them to packages and users
"""
self._check_admin()
package_limits = self._get_all_packages_with_limits(clean_dead_packages=True)
for package, (soft, hard) in package_limits.items():
self.set_package_limit(package, soft, hard, save=False, only_store=True)
self._remove_unexisting_users()
self._flush_device_quota()
def save_user_cache(self):
"""
Caches the limits to non-privileged user to see them
"""
self._check_admin()
# get data from repquota utility
current_quotas = self._get_current_quotas()
# form 2d array for writing to file
cache_content = [
[k] + [current_quotas[k][field] for field in self._fields]
for k in sorted(current_quotas.keys(), key=int)
]
self._get_global_lock(True)
file_handler = self._prepare_writer(QuotaWrapper.CACHEFILE)
csv_out = csv.writer(file_handler, quoting=csv.QUOTE_MINIMAL)
csv_out.writerows(cache_content)
self._end_writer(QuotaWrapper.CACHEFILE)
self._release_lock()
def _apply_all_limits(self, skip_root: bool = True):
"""Set limits for all users. Skip root user if skip_root is True"""
for uid in self._get_uid_to_packages_map().keys():
if uid == '0' and skip_root:
continue
self.set_user_limit(uid, soft=None, hard=None, save=False, only_store=True)
self._flush_device_quota()
def _flush_device_quota(self):
"""Write all device quotas to disk"""
quotas_written: bool = False
for device in self._device_quota.keys():
cmd = [QuotaWrapper.SETQUOTA, '-bu', device]
run_command(cmd, std_in=self._device_quota[device])
quotas_written = True
if quotas_written:
self._sync_quota_files()
self._device_quota = {}
def _sync_quota_files(self, device: str | None = None):
"""
In order to sync inodes limits in kernel with limits in user.quota file run `quotasync` command.
Otherwise `repquota` called right after `setquota` may return old limits existed before `setquota` call.
Skipped on the XFS filesystem because XFS does not require a separate quota synchronization step
as it handles these operations in real-time. Additionally, the specific functionality required by `quotasync`
to sync the disk quota information is not implemented, resulting in an error.
"""
if device is not None:
fs_type = get_filesystem_type(device)
if fs_type.lower() == 'xfs':
return
cmd = [QuotaWrapper.QUOTASYNC, device]
else:
cmd = [QuotaWrapper.QUOTASYNC, '-a']
run_command(cmd)
def _remove_unexisting_users(self):
"""Remove all records from cl-quota.dat for users which do not exist"""
if self._dh.has_section('users'):
for uid in self._dh.options('users'):
try:
# Check user presence
self._fetch_homedir(uid)
except NoSuchUserException:
self._dh.remove_option('users', uid)
self._write_data()
def _check_package_exists(self, package):
"""Check whether package exists"""
try:
self._get_package_to_users_map(package)
except NoSuchPackageException:
return False
else:
return True
def _get_user_limits_to_save(
self,
uid: str,
soft_validated: Optional[str],
hard_validated: Optional[str],
force_save: bool = False,
) -> Tuple[Optional[str], Optional[str]]:
"""
Derive package limit values to save to cl-quota.dat
If None passed as limit to method, then replace it by the user's value from cl-quota.dat
Update cl-quota.dat only if the derivation result changes
"""
soft_user_dat, hard_user_dat = self._get_user_limits(uid=uid)
soft_resolved, hard_resolved = self._combine_user_limits(uid=uid, soft=soft_validated, hard=hard_validated)
soft_none_resolved, hard_none_resolved = self._combine_user_limits(uid=uid, soft=None, hard=None)
if soft_resolved != soft_none_resolved or (force_save and soft_validated is not None):
soft_user_dat = soft_validated
if hard_resolved != hard_none_resolved or (force_save and soft_validated is not None):
hard_user_dat = hard_validated
return soft_user_dat, hard_user_dat
def _get_package_limits_to_save(
self, package: str, soft_validated: Optional[str], hard_validated: Optional[str]) -> Tuple[str, str]:
"""Derive package limit values to save to cl-quota.dat"""
p_soft, p_hard = self._get_package_limits(package=package)
# If new value is provided, then update package limit
p_soft = soft_validated if soft_validated is not None else p_soft
p_hard = hard_validated if hard_validated is not None else p_hard
return p_soft, p_hard
def _check_present_panel(self):
"""
Return True if control panel present
"""
if self._panel_present is None:
self._panel_present = 'Unknown' != run_command(['/usr/bin/cldetect', '--detect-cp-nameonly']).rstrip()
return self._panel_present
def _check_admin(self):
'''
Raise exception if no admin user
'''
if self._euid != 0:
raise InsufficientPrivilegesException()
def _get_saved_data_handler(self) -> ConfigParser.ConfigParser:
'''
Gets ConfigParser handler for future use
Loads saved quotas from /etc/container/cl-quotas.dat file
'''
self._get_global_lock(True)
dh = ConfigParser.ConfigParser(interpolation=None, strict=False)
dh.optionxform = str
try:
dh.read(QuotaWrapper.DATAFILE)
except ConfigParser.ParsingError as e:
raise MalformedConfigException(e)
finally:
self._release_lock()
return dh
def _get_device_user_map(self):
"""
Returns dictionary mapping devices to lists of users
"""
if self._device_user_map is not None:
return self._device_user_map
devices_map = {}
device_user_pairs = []
for uid in self._get_list_of_uids():
try:
device_user_pairs.append((self._get_home_device(self._fetch_homedir(uid)), uid))
except KeyError:
continue
for pair in device_user_pairs:
if pair[0] not in devices_map:
devices_map[pair[0]] = []
devices_map[pair[0]].append(pair[1])
self._device_user_map = devices_map
return self._device_user_map
def _check_limit(self, limit: Optional[str]) -> Optional[str]:
if limit is None or limit == '-1':
return limit
limit_pattern = re.compile(r'(\d+)')
pattern_match = limit_pattern.search(limit)
if not pattern_match:
raise IncorrectLimitFormatException(limit)
return pattern_match.group(1)
def _combine_user_limits(self, uid: str, soft: Optional[str] = None, hard: Optional[str] = None) -> Tuple[str, str]:
"""
Determines user limits by resolving them according to the limits inheritance rule:
user limits ---(overridden by provided as method param if provided not None)--->
---> package limits ---> root user (uid=0) limits
uid: user id
soft: Optional[str] = None: limit value, can be:
* None -- value not passed
* "-1" -- unlimited
* "0" -- default (next value from hierarchy should be considered)
* "1", "2", ... -- precice limit values
hard: soft: Optional[str] = None: limit value, values range the same as for soft
return: Tuple[str, str]: (soft_limit, hard_limit), they can be:
* "-1" -- unlimited
* "1", "2", ... -- precice limit values
"""
if uid == '0':
soft, hard = self._get_user_limits_override_none(uid=uid, soft=soft, hard=hard)
soft, hard = (soft if soft != '0' else '-1', hard if hard != '0' else '-1')
return soft, hard
soft, hard = self._get_user_limits_override_none(uid=uid, soft=soft, hard=hard)
for package in self._get_uid_to_packages_map(uid):
soft, hard = self._get_package_limits_override_default(package=package, soft=soft, hard=hard)
soft, hard = self._get_user_limits_override_default(uid='0', soft=soft, hard=hard)
soft, hard = (soft if soft != '0' else '-1', hard if hard != '0' else '-1')
return soft, hard
def _get_user_limits_override_none(self, uid: str, soft: Optional[str] = None, hard: Optional[str] = None) -> Tuple[str, str]:
"""Get user limits from cl-quota.dat. If limit is None, then override it by user's limit
:param str uid: user id
:param Optional[str] soft: limit value, can be:
* None -- value not passed, should be overridden by user's limit
* "-1" -- unlimited
* "0" -- default (next value from hierarchy should be taken)
* "1", "2", ... -- values
:param Optional[str] hard: limit values same as for soft
:return Tuple[str, str]: derived limits
"""
user_soft, user_hard = self._get_user_limits(uid=uid)
# Override by passed limits values
soft = self._check_limit(limit=soft if soft is not None else user_soft)
hard = self._check_limit(limit=hard if hard is not None else user_hard)
return soft, hard
def _get_user_limits_override_default(self, uid: str, soft: str, hard: str) -> Tuple[str, str]:
"""Get user limits from cl-quota.dat. If limit is default, then override it by user's limit
:param str uid: user id
:param Optional[str] soft: limit value, can be:
* "-1" -- unlimited
* "0" -- default (next value from hierarchy should be taken)
* "1", "2", ... -- values
:param Optional[str] hard: limit values same as for soft
:return Tuple[str, str]: derived limits
"""
user_soft, user_hard = self._get_user_limits(uid=uid)
# Override by passed limits values
soft = self._check_limit(limit=soft if soft != '0' else user_soft)
hard = self._check_limit(limit=hard if hard != '0' else user_hard)
return soft, hard
def _get_user_limits(self, uid: str) -> Tuple[str, str]:
"""Try to get user's limits from cl-quota.dat"""
try:
user_soft, user_hard = self._dh.get('users', uid).split(':')
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
user_soft, user_hard = '0', '0'
return user_soft, user_hard
def _get_package_limits_override_default(self, package: str, soft: str, hard: str) -> Tuple[str, str]:
"""Get package limits from cl-quota.dat. If passed limit is default, then override it by package's limit"""
pack_soft, pack_hard = self._get_package_limits(package=package)
soft = self._check_limit(limit=pack_soft if soft == '0' else soft)
hard = self._check_limit(limit=pack_hard if hard == '0' else hard)
return soft, hard
def _get_package_limits(self, package: str) -> Tuple[str, str]:
"""Try to get package's limits from cl-quota.dat"""
try:
soft, hard = self._get_package_from_dh(package)
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
soft, hard = '0', '0'
return soft, hard
@staticmethod
def _convert_for_sys_utility(soft: Optional[str], hard: Optional[str]) -> Tuple[Optional[str], Optional[str]]:
"""Converts limits for setquota utility which threats 0 as unlimited"""
if soft == '-1':
soft = '0'
if hard == '-1':
hard = '0'
return soft, hard
def _combine_package_limits(
self, package: str, uid: str, soft: Optional[str], hard: Optional[str]) -> Tuple[str, str]:
"""
Determines package limits taking into account saved user and default ones
"""
u_soft, u_hard = self._get_user_limits(uid=uid)
if IS_DA and is_quota_inheritance_enabled():
# Check the real user's package and save his quotas (instead of setting `DEFAULT` package ones)
# This is only DA's specificall
da_real_package = self._get_da_real_package(uid=uid)
if da_real_package != package:
da_real_quotas = self._get_package_quotas(packname=da_real_package, all_packages=True)
soft = p_soft = da_real_quotas[da_real_package]['inodes_soft']
hard = p_hard = da_real_quotas[da_real_package]['inodes_hard']
else:
p_soft, p_hard = self._get_package_limits(package=package)
else:
p_soft, p_hard = self._get_package_limits(package=package)
# Combine package limits with new package limits
# If user limit is non-default, take it, otherwise use package limit
# Override package limit by the new one if it's possible
soft = u_soft if u_soft != '0' else (soft if soft is not None else p_soft)
hard = u_hard if u_hard != '0' else (hard if hard is not None else p_hard)
# If package limits absent, use default limits
soft, hard = self._get_user_limits_override_default(uid='0', soft=soft, hard=hard)
soft, hard = (soft if soft != '0' else '-1', hard if hard != '0' else '-1')
return soft, hard
def _get_saved_package_limits_if_none(self, package, soft=None, hard=None):
"""
Applies saved package limits if none has been passed
"""
try:
pack_soft, pack_hard = self._get_package_from_dh(package)
if soft is None and pack_soft != '0':
soft = pack_soft
if hard is None and pack_hard != '0':
hard = pack_hard
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
pass
soft = self._check_limit(soft)
hard = self._check_limit(hard)
return soft, hard
def _get_da_real_package(self, uid: str) -> str:
"""
Get real package name for DA user
:param str uid: user id
:return str: retrieved package name
"""
username = ClPwd().get_names(int(uid))[0]
return clcontrollib.DirectAdmin()._get_user_package(username)
def _get_current_quotas(self, uid=None):
"""
Retrieves current quotas.
If euid == 0, use data from repquota utility, else from /etc/container/cl-quotas.cache file
"""
if self._euid != 0:
return self._load_user_cache()
if not self._quota:
# Retrieves quotas from repquota utility
self._quota = self._load_current_quotas()
if uid:
try:
return {uid: self._quota[uid]}
except KeyError:
self._check_if_quota_enabled(uid)
raise NoSuchUserException(uid)
return self._quota
def _get_package_quotas(self, packname=None, all_packages=False):
"""
Prepares package limits data for outputting
(call only from get_package_limits/get_all_packages_limits - main)
:param packname: Package name for get limits. If present, function returns
limits only for this package, else - all packages
:param all_packages: If False reads only used and admin's packages, True - all packages
(including reseller packages without users)
:return Dictionary of package limits:
{package_name: {'inodes_used': 'xxx', 'inodes_soft': 'yyy', 'inodes_hard': 'zzz'}
"""
q = {}
if all_packages:
# Get list of all packages
list_of_packages = self._get_all_package_list()
else:
# Get list of used packages + all admin's packages
list_of_packages = self._get_list_of_packages()
for package in list_of_packages:
values = ['-']
try:
if package == VE_DEFAULT_PACKAGE:
# Because "VE_DEFAULT" package is not a real package and just
# uses limits from LVE == 0 we should read it's limits
# from there
soft, hard = self._dh.get('users', '0').split(':')
else:
soft, hard = self._dh.get('packages', package).split(':')
soft = self._check_limit(soft)
hard = self._check_limit(hard)
if soft == '-1':
soft = '-'
if hard == '-1':
hard = '-'
values.extend([soft, hard])
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
values.extend(['0', '0'])
q.update(self._populate(package, values))
if packname:
try:
return {packname: q[packname]}
except KeyError:
raise NoSuchPackageException(packname)
return q
def _populate(self, item, data):
return {item: dict(list(map((lambda x: (x[1], data[x[0]])), enumerate(self._fields[3:]))))}
def _get_list_of_packages(self):
return list(self._get_package_to_users_map().keys())
def _get_list_of_uids(self):
return list(self._get_uid_to_packages_map().keys())
def _get_package_to_users_map(self, package=None):
if not self._package_to_uids_map:
self._package_to_uids_map = self._load_package_uids_data()
if package:
try:
return self._package_to_uids_map[package]
except KeyError:
raise NoSuchPackageException(package)
return self._package_to_uids_map
def _check_if_quota_enabled(self, uid):
if uid in self._quota_enabled_list:
return
home_dir = self._fetch_homedir(uid)
quota_disabled_message = check_quota_enabled(path=home_dir)
if quota_disabled_message:
raise UserQuotaDisabledException(uid=uid, homedir=home_dir, message=quota_disabled_message)
else:
self._quota_enabled_list.append(uid)
def _get_uid_to_packages_map(self, uid=None):
if not self._uid_to_packages_map:
self._package_to_uids_map = self._load_package_uids_data()
if uid:
try:
return self._uid_to_packages_map[uid]
except KeyError:
raise NoSuchUserException(uid)
return self._uid_to_packages_map
def _get_packages_uids_from_cpapi(self) -> Dict[str, List[str]]:
"""
Retrieve package-uids map from cpapi. Only for custom panels. See LU-610 for details.
Null packages coming from cpapi are considered to be 'default' package.
:return: Dictionary with data.
Example response:
{'default': ['1038', '1043', '1046'], 'res1_pack1': ['1044'], 'pack1': ['1042']}
Coorresponding self._uid_to_packages_map value:
{'1038': ['default'], '1042': ['pack1'], '1043': ['default'], '1044': ['res1_pack1'], '1046': ['default']}
"""
try:
users_packages = list_users()
except (OSError, CPAPIExternalProgramFailed, EncodingError) as e:
raise ExternalProgramFailed('%s. Can not get users' % (str(e)))
# Example of users_packages:
# {1000: {'reseller': 'root', 'package': 'Package1'},
# 1001: {'reseller': 'res1', 'package': 'BusinessPackage'},
# 1002: {'reseller': 'root', 'package': None}}
packages_users = defaultdict(list) # type: Dict[str, List[str]]
self._uid_to_packages_map = defaultdict(list) # type: Dict[str, List[str]]
for uid, uid_data in users_packages.items():
s_uid = str(uid)
package = uid_data['package'] if uid_data['package'] is not None else VE_DEFAULT_PACKAGE
packages_users[package].append(s_uid)
self._uid_to_packages_map[s_uid].append(package)
try:
admin_pkgs = admin_packages(raise_exc=True)
except (OSError, CPAPIExternalProgramFailed) as e:
raise ExternalProgramFailed('%s. Can not get admin packages' % (str(e)))
for package in admin_pkgs:
packages_users.setdefault(package if package is not None else VE_DEFAULT_PACKAGE, [])
packages_users.setdefault(VE_DEFAULT_PACKAGE, [])
return packages_users
def _load_package_uids_data(self) -> Dict[str, List[str]]:
"""
Gets map of packages and users
:rtype dict
:return Dictionary with data. Example:
{'default': ['1038', '1043', '1046'], 'res1_pack1': ['1044'], 'pack1': ['1042']}
"""
packages = {}
if self._euid != 0:
return packages
# if packages not supported all user has 'VE_DEFAULT' package
if not self._check_present_panel():
packages[VE_DEFAULT_PACKAGE] = list(map(str, _get_users_list()))
self._uid_to_packages_map = {i: [VE_DEFAULT_PACKAGE] for i in packages[VE_DEFAULT_PACKAGE]}
return packages
return self._get_packages_uids_from_cpapi()
def _get_all_package_list(self):
"""
Retrives all (root and resellers) panel package list
:return: List of package names
"""
# If list already loaded - do nothing
if self._all_package_list:
return self._all_package_list
try:
self._all_package_list = []
list_admin_packages = admin_packages(raise_exc=True)
for package in list_admin_packages:
self._all_package_list.append(package)
except (OSError, CPAPIExternalProgramFailed) as e:
raise ExternalProgramFailed('%s. Can not get admin packages' % (str(e)))
try:
dict_resellers_packages = resellers_packages(raise_exc=True)
for packages_list in dict_resellers_packages.values():
for package in packages_list:
self._all_package_list.append(package)
except (OSError, CPAPIExternalProgramFailed) as e:
raise ExternalProgramFailed('%s. Can not get reseller packages' % (str(e)))
# Add 'VE_DEFAULT' package to list
if VE_DEFAULT_PACKAGE not in self._all_package_list:
self._all_package_list.append(VE_DEFAULT_PACKAGE)
return self._all_package_list
def _convert_data_to_tuples(self, data):
'''
Convert dict to tuples for passing to printing routines
'''
for key in data.keys():
try:
entry = tuple(map((lambda x: (x, data[key][x])), self._fields[3:]))
data[key] = entry
except KeyError:
continue
return data
def _load_current_quotas(self):
"""
Gets current quota settings from repqouta utility for further processing
"""
q = {}
device = None
devices = self._devices
cmd = [QuotaWrapper.REPQUOTA, '-una']
data = run_command(cmd)
grace_regex_pattern = re.compile(r'(block|inode)\sgrace\stime:?\s(\d[\w:]+)(?:;|$|\s)', re.IGNORECASE)
for line in data.splitlines():
if line.startswith('#'):
if not device:
continue
parts = line.split()
if len(parts) != 8:
parts = self._remove_redundant_fields_from_input(parts)
uid = parts[0][1:]
if uid == '0': # We do not want to limit root :)
continue
try:
if device not in devices:
device = self._find_unknown_device(device)
if device in devices and self._is_home_device(self._fetch_homedir(uid), device):
q[uid] = dict(list(map((lambda x: (self._fields[x[0]], x[1])), enumerate(parts[2:]))))
except (KeyError, IndexError, NoSuchUserException):
continue
elif line.startswith('***'):
device = line[line.find('/dev'):].strip()
elif 'grace' in line:
found = grace_regex_pattern.findall(line)
if found:
self._grace.update(dict(list(map((lambda x: (x[0].lower(), x[1])), found))))
q.update(self._add_default())
return q
def _remove_redundant_fields_from_input(self, parts):
stripped_parts = parts[:2]
is_digit_pattern = re.compile(r'^\d+$')
stripped_parts.extend(
[field for field in parts[2:] if is_digit_pattern.search(field)])
return stripped_parts
def _fetch_homedir(self, uid):
if len(self._uid_to_homedir_map) == 0:
self._uid_to_homedir_map.update({str(entry.pw_uid): entry.pw_dir for entry in pwd.getpwall()})
try:
return self._uid_to_homedir_map[uid]
except KeyError:
raise NoSuchUserException(uid)
def _load_quota_devices(self):
"""
Gets mounted filesystems list and picks ones with quota on
Example of returned data structure:
{'/dev/mapper/VolGroup-lv_root': [
{'mountpoint': '/', 'quota_file': 'quota.user', 'quota_type': 'vfsv0'},
{'mountpoint': '/var', 'quota_file': 'quota.user', 'quota_type': 'vfsv0'}
],
'/dev/mapper/VolGroup-lv_root2': [
{'mountpoint': '/', 'quota_file': 'quota.user', 'quota_type': 'vfsv0'},
{'mountpoint': '/var', 'quota_file': 'quota.user', 'quota_type': 'vfsv0'}
]
}
"""
devices = {} # type: Dict[str, List[Dict[str, str]]]
proc_mounts_stream = open(QuotaWrapper.PROC_MOUNTS)
split_patt = re.compile(r' |,')
for line in proc_mounts_stream:
if line.startswith('rootfs /'):
continue
line_splited = split_patt.split(line)
device = line_splited[0]
mountpoint_data = {'mountpoint': line_splited[1]}
for line_splited_element in line_splited:
if line_splited_element.startswith('usrquota=') or line_splited_element.startswith('usruota='):
mountpoint_data['quota_file'] = line_splited_element.split('=')[1]
elif line_splited_element.startswith('jqfmt='):
mountpoint_data['quota_type'] = line_splited_element.split('=')[1]
if device in devices:
devices[device].append(mountpoint_data)
else:
devices[device] = [mountpoint_data]
proc_mounts_stream.close()
if len(devices) == 0:
# TODO: this only can happen when system HAS NO MOUNTS AT ALL
raise QuotaDisabledException()
return devices
def _load_user_cache(self):
'''
For non-privileged user we outputting data from the file
'''
q = {}
try:
self._get_global_lock()
fo = open(QuotaWrapper.CACHEFILE)
cvs_in = csv.reader(fo, delimiter=',')
except (OSError, IOError):
# We don't want to confuse a panel with error messages.
# Let the data be zeroes until they arrive
return {str(self._euid): dict.fromkeys(self._fields, '0')}
finally:
self._release_lock()
uid = str(self._euid)
for row in cvs_in:
if row[0] == uid:
q.update({row[0]: dict(list(map(
(lambda x: (self._fields[x],
row[x+1])),
range(len(self._fields)))))}) # pylint: disable=range-builtin-not-iterating
break
# We want to prevent crazy cases like misedited cache file
if not q:
return {str(self._euid): dict.fromkeys(self._fields, '0')}
return q
def _get_mountpoint_device_map(self, devices) -> List[Tuple[str, str]]:
"""
return list tuple ('mountpoin tpath', 'device') reverse sorted by deep mountpoint path
[('/mountpoint_path/path', '/device'), ('/mountpoint_path', '/device')]
"""
def sort_by_deep_path(device_mountpoint):
if device_mountpoint[0] == '/':
deep_path = 0
else:
deep_path = device_mountpoint[0].count('/')
return deep_path
mountpoint_device_map = []
for device, mountpoint_data_list in devices.items():
for mountpoint_data in mountpoint_data_list:
mountpoint_path = mountpoint_data['mountpoint']
mountpoint_device_map.append((mountpoint_path, device))
mountpoint_device_map.sort(key=sort_by_deep_path, reverse=True)
return mountpoint_device_map
def _get_home_device(self, home):
"""
Returns device user homedir is on
"""
def _add_slash(path):
if path and path[-1] != '/':
path += '/'
return path
dirname = _add_slash(os.path.dirname(home))
for mounpoint_path, device in self._mountpoint_device_mapped:
if dirname.startswith(_add_slash(mounpoint_path)):
return device
def _is_home_device(self, home, device):
"""
Checks if a device is user homedir device
"""
return self._get_home_device(home) == device
def _find_unknown_device(self, device):
try:
dev = os.stat(device)[ST_DEV]
dev_to_find = (os.major(dev), os.minor(dev))
for current_device in self._devices.keys():
dev = os.stat(current_device)[ST_DEV]
if dev_to_find == (os.major(dev), os.minor(dev)):
return current_device
except OSError:
return device
def _add_default(self):
"""
Insert 'default' quota.
Calls only from _load_current_quotas, after parsing repquota's output
"""
values = ['-', '0', '0', '-']
try:
user_soft, user_hard = self._dh.get('users', '0').split(':')
# Replace -1 to 0 for set unlimited limit
if user_soft == '-1':
user_soft = '0'
if user_hard == '-1':
user_hard = '0'
values.extend([user_soft, user_hard])
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
values.extend(['0', '0'])
return {'0': dict(list(map((lambda x: (x[1], values[x[0]])), enumerate(self._fields))))}
def _save_user_limits(self, uid, soft, hard):
"""
Saves user limits
"""
if soft is None:
soft = '0'
if hard is None:
hard = '0'
# remove user limits if they are equal to default
if soft == '0' and hard == '0' and self._dh.has_section('users'):
self._dh.remove_option('users', uid)
else:
if not self._dh.has_section('users'):
self._dh.add_section('users')
self._dh.set('users', uid, '%s:%s' % (soft, hard))
self._write_data()
def _save_package_limits(self, package, soft, hard):
"""
Saves package limits
"""
if soft is None:
soft = '0'
if hard is None:
hard = '0'
if soft == '0' and hard == '0' and self._dh.has_section('packages'):
self._dh.remove_option('packages', package)
else:
if not self._dh.has_section('packages'):
self._dh.add_section('packages')
self._dh.set('packages', package, '%s:%s' % (soft, hard))
self._write_data()
self._copy_package_limits_to_cpanel(package)
def _copy_package_limits_to_cpanel(self, package):
"""
Copy package quota limits from cl-quotas.dat to cpanel packages data
"""
if not cldetectlib.is_cpanel():
return # skip func if panel not cPanel
package_path = f'/var/cpanel/packages/{package}'
cpanel_package_lines = get_file_lines(package_path)
if len(cpanel_package_lines) == 0:
return # skip func if no cPanel package found
old_cpanel_data, modified_cpanel_lines = self._parse_cpanel_package_data(cpanel_package_lines)
if old_cpanel_data is None and modified_cpanel_lines is None:
return # skip func if no lve extension in package
# don't rewrite cpanel package file if new quotas for package are the same
quotas_data = self._get_package_quotas(package, all_packages=True)[package]
# unlimited quotas for package are indicated as '-',
# but in package we want to write '-1'
for key, value in quotas_data.items():
if value == '-':
quotas_data[key] = '-1'
old_cpanel_limits = (old_cpanel_data.get('inodes_soft', '0'), old_cpanel_data.get('inodes_hard', '0'))
current_quota_limits = (quotas_data['inodes_soft'], quotas_data['inodes_hard'])
if old_cpanel_limits == current_quota_limits:
return
for limit_type in ('inodes_soft', 'inodes_hard'):
limit_string = 'lve_' + str(limit_type) + '=' + str(quotas_data[limit_type]) + '\n'
modified_cpanel_lines.append(limit_string)
write_file_lines(package_path, modified_cpanel_lines, 'w')
@staticmethod
def _parse_cpanel_package_data(cpanel_package_lines):
"""
Process cpanel_package_lines - get values of all old lve_ limits
and remove lines with limits that would be changed
"""
cpanel_package_lines_modified = cpanel_package_lines[:]
old_cpanel_data = {}
for line in cpanel_package_lines:
if line.startswith('lve_'):
line_parts = line.strip().split('=')
limit_name = line_parts[0].replace('lve_', '').strip()
if line_parts[1] != 'DEFAULT':
old_cpanel_data[limit_name] = line_parts[1]
if limit_name in ('inodes_soft', 'inodes_hard'):
cpanel_package_lines_modified.remove(line)
if line.startswith('_PACKAGE_EXTENSIONS') and 'lve' not in line:
return None, None
return old_cpanel_data, cpanel_package_lines_modified
def _save_data(self, soft, hard, item, item_type):
'''
Saves data to a file
'''
if soft == '0' and hard == '0':
try:
self._dh.remove_option(item_type, item)
except ConfigParser.NoSectionError:
pass
else:
if not self._dh.has_section(item_type):
self._dh.add_section(item_type)
self._dh.set(item_type, item, '%s:%s' % (soft, hard))
self._write_data()
def _prepare_writer(self, filepath):
"""
Open temporary file for writing and return file object
"""
path = os.path.dirname(filepath)
try:
fd, temp_path = tempfile.mkstemp(prefix='lvetmp_', dir=path)
file_handler = os.fdopen(fd, 'w')
self._tmp = temp_path
return file_handler
except (IOError, OSError):
if os.path.exists(temp_path):
os.unlink(temp_path)
raise GeneralException("Could not save data")
def _end_writer(self, path):
'''
Routines after writing to file
'''
try:
mask = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH
os.rename(self._tmp, path)
os.chmod(path, mask)
except OSError:
pass
def _write_data(self):
'''
Actual place of saving data to a file
'''
self._get_global_lock(True)
file_handler = self._prepare_writer(QuotaWrapper.DATAFILE)
self._dh.write(file_handler)
self._end_writer(QuotaWrapper.DATAFILE)
self._release_lock()
##########################
# File lock functions
def _get_global_lock(self, write=False):
if write:
QuotaWrapper.LOCK_WRITE = True
if QuotaWrapper.LOCK_FD is None:
try:
QuotaWrapper.LOCK_FD = open(QuotaWrapper.LOCK_FILE, 'r')
except (IOError, OSError):
raise GeneralException("Can't open lock file for reading")
try:
fcntl.flock(QuotaWrapper.LOCK_FD.fileno(), fcntl.LOCK_EX)
except IOError:
raise GeneralException("Can't get lock")
def _release_lock(self):
if (not QuotaWrapper.LOCK_WRITE) and (QuotaWrapper.LOCK_FD is not None):
QuotaWrapper.LOCK_FD.close()
QuotaWrapper.LOCK_FD = None