"""
Common logging, sentry reporting and confing parsing code for such tools as
- /opt/imunify360/venv/bin/imunify360_pam.py
- /opt/imunify360/venv/bin/pam_imunify_daemon-watchdog.py
- /opt/imunify360/venv/bin/pamsock_ready_check.py
"""
from configparser import ConfigParser
from functools import lru_cache, wraps
import json
import logging.config
import os
from pathlib import Path
import subprocess
import sys
import time
from typing import Dict
import urllib.request
import sentry_sdk
CONFIG = '/etc/pam_imunify/i360.ini'
SENTRY_DSN = 'https://576732f0a25446c4b1f5b4a706e6b9d5@sentry.cloudlinux.com/52'
logger = logging.getLogger(__name__)
@lru_cache(1)
def pam_imunify_config() -> Dict[str, str]:
try:
with open(CONFIG) as f:
conf = ConfigParser(default_section='-')
conf.read_string("[-]\n" + f.read())
return conf['-']
except Exception:
# Being robust!
# Furthermore, the exception is still to be reported
# to sentry even if it occurs before logger_init()
logger.exception("%s parsing error", CONFIG)
return {
'sentry': 'https://sentry.cloudlinux.com/sentry/i360-pam-imunify/',
}
def returnvalue_filecaching(filepath: str):
TIMEOUT = 24*60
def decorator(fun):
@wraps(fun)
def wrapper():
path = Path(filepath)
if not path.exists() or (time.time() - TIMEOUT >=
path.stat().st_mtime):
rv = fun()
try:
path.write_text(rv)
except Exception:
logger.exception("%s write error", filepath)
finally:
return rv
else:
try:
return path.read_text()
except Exception:
logger.exception("%s read error", filepath)
return fun()
return wrapper
return decorator
class _SentryTags:
@staticmethod
@lru_cache(1)
def name():
tool = Path(sys.argv[0])
if tool.is_symlink():
tool = Path(os.readlink(sys.argv[0]))
return tool.name
@staticmethod
def os_version():
os_release = Path('/etc/os-release')
if not os_release.exists():
return 'CloudLinux/CentOS 6'
conf = ConfigParser(default_section='-')
conf.read_string("[-]\n" + os_release.read_text())
return conf['-']['PRETTY_NAME'].strip('"')
@staticmethod
@lru_cache(1)
def server_id():
try:
with open('/var/imunify360/license.json') as f:
license = json.load(f)
return license['id']
except FileNotFoundError:
return 'N/A'
@staticmethod
@returnvalue_filecaching('/var/cache/imunify360-pam_ipify.org')
def server_ip():
# HTTP currenly is about 0.5 sec faster than HTTPS
with urllib.request.urlopen("http://api.ipify.org", timeout=2) as r:
return r.read().decode()
@classmethod
def user(cls):
return cls.server_id()
@staticmethod
def version():
for cmd in [
['/bin/rpm', '-q', '--queryformat=%{VERSION}-%{RELEASE}', 'imunify360-pam'],
['/usr/bin/dpkg-query', '--showformat=${Version}', '--show', 'imunify360-pam'],
]:
try:
return subprocess.check_output(cmd, text=True).strip()
except (FileNotFoundError, subprocess.CalledProcessError) as e:
last_err = str(e)
else:
return last_err
def getLogger():
return logging.getLogger(_SentryTags.name())
def _sentry_init():
# Sentry error reporting (must be an url or "off")
if ('sentry' not in pam_imunify_config() or
not pam_imunify_config()['sentry'].startswith('http')):
return {
'level': 'NOTSET',
'class': 'logging.NullHandler',
}
sentry_sdk.init(
dsn=SENTRY_DSN,
# release=config.Core.VERSION,
attach_stacktrace='on')
with sentry_sdk.configure_scope() as scope:
for tag in (attr for attr in dir(_SentryTags) if not attr.startswith('_')):
try:
value = getattr(_SentryTags, tag)()
except Exception as e:
# It is ok to use uninialized logger to print stderr messages
logger.exception("sentry_init() '%s' tag error", tag)
# Being robust!
value = str(e)
if tag == 'user':
scope.user = {'id': value}
elif tag == 'os_version':
scope.set_tag('os.version', value)
else:
scope.set_tag(tag, value)
return {
'level': 'ERROR',
'class': 'sentry_sdk.integrations.logging.SentryHandler'
}
def _pamlog_handler():
rv = {
'level': 'INFO',
'class': 'logging.FileHandler',
'formatter': 'pam_log',
'filename': pam_imunify_config().get('log',
'/var/log/imunify360/pam.log'),
}
Path(rv['filename']).parent.mkdir(mode=0o700, parents=True, exist_ok=True)
return rv
def logger_init(console_logfmt='[%(levelname)s] %(message)s',
console_stream='sys.stderr'):
logging.config.dictConfig({
'version': 1,
'handlers': {
# 'sentry': _sentry_init(),
'pam_log': _pamlog_handler(),
'console': {
'level': 'INFO',
'formatter': 'console_log',
'class': 'logging.StreamHandler',
'stream': 'ext://%s' % console_stream,
} if console_stream is not None else {
'level': 'NOTSET',
'class': 'logging.NullHandler',
},
},
'root': {
'level': 'NOTSET',
'handlers': [
'console',
'pam_log',
# 'sentry'
]
},
'formatters': {
'console_log': {'format': console_logfmt},
'pam_log': {
'format': 'time="%(asctime)s" level=%(levelname)s msg="%(message)s" '
'cmdline="{cmdline}" '
'tool={tool_name}'.format(
cmdline=' '.join(sys.argv),
tool_name=_SentryTags.name()
),
'datefmt': '%Y-%m-%dT%H:%M:%S%z',
},
},
'disable_existing_loggers': False
})
return getLogger()