# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
import logging
from subprocess import CalledProcessError, Popen, PIPE
from typing import Dict, AnyStr, Optional, List
from clcommon.evr_utils import serialize_evr
from clcommon.utils import is_ubuntu
app_logger = logging.getLogger('cloudlinux-summary.get_rpm_packages_info')
def get_rpm_packages_info() -> List[Optional[Dict[AnyStr, Optional[AnyStr]]]]:
"""
Get full info about all of rpm packages:
- name
- epoch
- version
- release
- arch
- serialized_version
"""
result = []
rpm_db_error_pattern = 'Thread died in Berkeley DB library'
is_ubuntu_os = is_ubuntu()
if is_ubuntu_os:
rpm_cmd = "dpkg-query -f '${Package}\t${Version}\t${Architecture}\n' -W"
else:
rpm_cmd = "rpm -qa --queryformat '%{name}\t%|epoch?{%{epoch}}:{None}|\t%{version}\t%{release}\t%{arch}\n'"
error_message = 'Can\'t get information about rpm packages, because'
rpm_db_warn_msg = 'Server has broken rpmdb. We can\'t get ' \
'statistics about rpm packages and skip its getting.'
returncode = 0
try:
with Popen(
rpm_cmd,
stdout=PIPE,
stderr=PIPE,
shell=True,
executable='/bin/bash',
text=True,
) as proc:
stdout, stderr = proc.communicate()
returncode = proc.returncode
# We shouldn't send event to Sentry if rpmdb is broken,
# because we can't do anything for getting statistics about rpm packages
# and can only skip this case
if rpm_db_error_pattern in stderr:
app_logger.warning(
rpm_db_warn_msg,
)
return result
except (CalledProcessError, OSError) as exception:
app_logger.exception(
'%s exception "%s',
error_message,
exception
)
return result
if returncode != 0:
app_logger.error(
'%s command "%s" return non-zero code "%s"',
error_message,
rpm_cmd,
returncode,
extra={
'stdout': stdout,
'stderr': stderr,
}
)
return result
lines = stdout.strip().split('\n')
for line in lines:
try:
name, epoch, version, release, arch = parse_package_manager_output(line, is_ubuntu_os)
except ValueError:
app_logger.error(
'The result of call "%s" has an invalid line "%s". '
'It should contain five elements.',
rpm_cmd,
line,
extra={
'stdout': stdout,
}
)
continue
epoch = None if epoch == 'None' else epoch
result.append({
'name': name,
'epoch': epoch,
'version': version,
'release': release,
'arch': arch,
'serialized_version': serialize_evr([
epoch,
version,
release,
])
})
return result
def parse_package_manager_output(line, is_ubuntu_os):
"""
rpm -qa already returns data in needed format, e.g:
lve-utils None 6.2.3 2.el7.cloudlinux.1639593336.cloudlinux.1639595623 x86_64
but, dpkg-query output needed to be parsed a bit, cause version column cannot be split by util
to epoch:version:release
lve-utils 6.2.2.1639220776 amd64
"""
if is_ubuntu_os:
name, version, arch = line.split('\t')
# deb package version [epoch:]upstream_version[-debian_revision]
epoch = None
if ':' in version:
epoch, version = version.split(':')
version, *release = version.split('-')
release = '-'.join(release) if release else None
else:
name, epoch, version, release, arch = line.split('\t')
return name, epoch, version, release, arch