# Module for logging in subprocess. Process related info (call stack) is logged
# to simplify investigation of main process hangings
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2024 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
import atexit
import collections
import dataclasses
import logging
import logging.handlers
import os
import pathlib
import queue
import time
from datetime import datetime
from multiprocessing import Event, Process, Queue
from raven.handlers.logging import SentryHandler
import psutil
from lve_utils.sentry import init_lve_utils_sentry_client
# NOTE: set records history size relatively small,
# otherwise sentry will trim the log message with history records and process ancestors
DEFAULT_RECORDS_HISTORY_SIZE = 10
DEFAULT_LOG_EVENTS_TIMEOUT = 300
DEFAULT_LOG_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
@dataclasses.dataclass
class ProcessDescription:
"""
Process description object
pid: int - process id
ppid: int | None - parent process id
user: str | None - process owner
status: str | None - process status
start_time: datetime | None - process start time
cmdline: str | None - process command line
"""
pid: int
ppid: int | None
user: str | None
status: str | None
start_time: datetime | None
cmdline: str | None
@classmethod
def from_psutil(cls, process: psutil.Process):
"""Build process description object using psutil.Process object"""
try:
ppid = process.ppid()
except Exception:
ppid = None
try:
cmdline = ' '.join(process.cmdline())
except Exception:
cmdline = None
try:
start_time = datetime.fromtimestamp(process.create_time())
except Exception:
start_time = None
try:
user = process.username()
except Exception:
user = None
try:
status = process.status()
except Exception:
status = None
return cls(process.pid, ppid, user, status, start_time, cmdline)
def to_str(self):
return f"PID: {self.pid}, PPID: {self.ppid}, User: {self.user}, Status: {self.status} " \
f"Start time: {self.start_time} Command: {self.cmdline}"
class QueueListenerWithHistory(logging.handlers.QueueListener):
def __init__(
self,
queue,
*handlers,
timeout: int = DEFAULT_LOG_EVENTS_TIMEOUT,
records_history_size: int = DEFAULT_RECORDS_HISTORY_SIZE,
):
"""Queue listener with log records history
:param multiprocessing.Queue queue: log records queue
:param int timeout: log records flow timeout, defaults to DEFAULT_LOG_EVENTS_TIMEOUT
In case timeput is reached, last log records and process ancestors are logged
:param int records_history_size: the max number of records to log in case of timeout
"""
super().__init__(queue, *handlers, respect_handler_level=True)
self._records_history_queue: collections.deque[logging.LogRecord] = collections.deque(
iterable=[],
maxlen=records_history_size
)
self._timeout = timeout
self._timout_registred = False
def dequeue(self, _block):
"""Dequeue log event"""
while True:
try:
record = self.queue.get(timeout=self._timeout)
self._records_history_queue.append(record)
break
except queue.Empty:
self._handle_queue_timeout()
return record
def _monitor(self):
"""Main loop to process log events"""
self._handle_monitor_started()
super()._monitor()
self._handle_monitor_finished()
def _handle_monitor_started(self):
"""Start monitoring log events processing time"""
self._start_ts = time.time() # pylint: disable=attribute-defined-outside-init
def _handle_monitor_finished(self):
"""Log that monitor finished processing log records"""
if self._timout_registred is True:
r = self._create_record(
logging.ERROR,
'Processing log queue took %.4f seconds. Pid: %s',
time.time() - self._start_ts,
os.getpid(),
)
self.handle(r)
def _get_ancestors(self):
"""Collect process ancestors up to pid 1"""
ancestors: list[ProcessDescription] = []
try:
process = psutil.Process()
ancestors = [ProcessDescription.from_psutil(process)]
while (ppid := process.ppid()) != 0:
process = psutil.Process(ppid)
ancestors.append(ProcessDescription.from_psutil(process))
return ancestors
except Exception:
return ancestors
def _create_record(self, level, msg, *args):
"""Create log record"""
return logging.LogRecord(
name=__name__,
level=level,
pathname='',
lineno=1,
msg=msg,
args=args,
exc_info=None,
)
def _handle_queue_timeout(self):
"""
Handle log queue timeout event.
Print last events (incl. omitted because debug disabled) and process ancestors to log.
"""
if self._timout_registred is False:
proc_ancestors = '\n'.join(
proc_descr.to_str() for proc_descr in self._get_ancestors()[::-1]
)
last_events = '\n'.join(
f'[{record.levelname}] - {datetime.fromtimestamp(record.created)}: {record.message}'
for record in self._records_history_queue
)
error_msg = 'Log queue timeout. Last events:\n%s\nProcess ancestors:\n%s'
error_args = (last_events, proc_ancestors)
r = self._create_record(logging.ERROR, error_msg, *error_args)
self.handle(r)
self._timout_registred = True
def get_log_level(file_name: str):
"""Get log level based on debug flag presence"""
return logging.DEBUG if pathlib.Path(f'{file_name}.debug.flag').exists() else logging.INFO
def run_subpprocess_logger(q: Queue, stop_event: Event, file_name: str, timeout: int):
"""
Main function to process log events in subprocess
:param Queue q: log records queue
:param Event stop_event: stop event
:param str file_name: log file name
"""
sentry_handler = SentryHandler(init_lve_utils_sentry_client('lvectl'), level=logging.ERROR)
fh = logging.FileHandler(file_name)
fh.setFormatter(logging.Formatter(DEFAULT_LOG_FORMAT))
fh.setLevel(get_log_level(file_name))
q_handler = QueueListenerWithHistory(q, fh, sentry_handler, timeout=timeout)
q_handler.start()
stop_event.wait()
q_handler.stop()
@dataclasses.dataclass
class SubprocessLogger:
logger_process: Process
stop_event: Event
subprocess_loggers: dict[str | None, SubprocessLogger] = {}
def init_subprocess_logger(
logger_name: str | None,
file_name: str,
timeout: int = DEFAULT_LOG_EVENTS_TIMEOUT,
) -> logging.Logger:
"""
Get asynchronous logger running in subprocess
Log records asynchronously in subprocess (>=info by default, >= debug when debug flag is present).
Log records from debug and higher in subprocess in case of timeout.
WARNING: multiprocessing.Queue is used to pass log records. It works asynchornously
(passing to subprocess in handled by separate thread). This may lead to log records not
being passed to the subprocess in case main process got stuck e.g. in kmodlve kernel space.
:param str | None logger_name: logger name
:param str file_name: log file
:param int timeout: log events processing timeout
"""
global subprocess_loggers
subprocess_logger = subprocess_loggers.get(logger_name)
if subprocess_logger is not None:
return logging.getLogger(logger_name)
logger = logging.getLogger(logger_name)
# NOTE: We want to pass all the records (incl. debug) to the subprocess
# so that we can log them in case of timeout
logger.setLevel(logging.DEBUG)
subprocess_logger_queue: Queue = Queue()
queue_handler = logging.handlers.QueueHandler(subprocess_logger_queue)
queue_handler.setLevel(logging.DEBUG)
logger.addHandler(queue_handler)
stop_event = Event()
logger_process = Process(target=run_subpprocess_logger,
args=(subprocess_logger_queue, stop_event, file_name, timeout))
logger_process.start()
atexit.register(stop_event.set)
subprocess_loggers[logger_name] = SubprocessLogger(logger_process, stop_event)
return logger
def stop_subprocess_logger(name: str):
"""
Stop subprocess logger
"""
global subprocess_loggers
subprocess_logger = subprocess_loggers.get(name)
if subprocess_logger is None:
return
subprocess_logger.stop_event.set()
subprocess_logger.logger_process.join()
del subprocess_loggers[name]