from tuned import exports, logs
from tuned.utils.commands import commands
from tuned.consts import PPD_CONFIG_FILE
from tuned.ppd.config import PPDConfig, PPD_PERFORMANCE, PPD_POWER_SAVER
from enum import StrEnum
import threading
import dbus
import os
log = logs.get()
DRIVER = "tuned"
NO_TURBO_PATH = "/sys/devices/system/cpu/intel_pstate/no_turbo"
LAP_MODE_PATH = "/sys/bus/platform/devices/thinkpad_acpi/dytc_lapmode"
class PerformanceDegraded(StrEnum):
NONE = ""
LAP_DETECTED = "lap-detected"
HIGH_OPERATING_TEMPERATURE = "high-operating-temperature"
class ProfileHold(object):
def __init__(self, profile, reason, app_id, watch):
self.profile = profile
self.reason = reason
self.app_id = app_id
self.watch = watch
def as_dict(self):
return {
"Profile": self.profile,
"Reason": self.reason,
"ApplicationId": self.app_id,
}
class ProfileHoldManager(object):
def __init__(self, controller):
self._holds = {}
self._cookie_counter = 0
self._controller = controller
def _callback(self, cookie, app_id):
def callback(name):
if name == "":
log.info("Application '%s' disappeared, releasing hold '%s'" % (app_id, cookie))
self.remove(cookie)
return callback
def _effective_hold_profile(self):
if any(hold.profile == PPD_POWER_SAVER for hold in self._holds.values()):
return PPD_POWER_SAVER
return PPD_PERFORMANCE
def _cancel(self, cookie):
if cookie not in self._holds:
return
hold = self._holds.pop(cookie)
hold.watch.cancel()
exports.send_signal("ProfileReleased", cookie)
exports.property_changed("ActiveProfileHolds", self.as_dbus_array())
log.info("Releasing hold '%s': profile '%s' by application '%s'" % (cookie, hold.profile, hold.app_id))
def as_dbus_array(self):
return dbus.Array([hold.as_dict() for hold in self._holds.values()], signature="a{sv}")
def add(self, profile, reason, app_id, caller):
cookie = self._cookie_counter
self._cookie_counter += 1
watch = self._controller.bus.watch_name_owner(caller, self._callback(cookie, app_id))
log.info("Adding hold '%s': profile '%s' by application '%s'" % (cookie, profile, app_id))
self._holds[cookie] = ProfileHold(profile, reason, app_id, watch)
exports.property_changed("ActiveProfileHolds", self.as_dbus_array())
self._controller.switch_profile(profile)
return cookie
def has(self, cookie):
return cookie in self._holds
def remove(self, cookie):
self._cancel(cookie)
if len(self._holds) != 0:
new_profile = self._effective_hold_profile()
else:
new_profile = self._controller.base_profile
self._controller.switch_profile(new_profile)
def clear(self):
for cookie in self._holds:
self._cancel(cookie)
class Controller(exports.interfaces.ExportableInterface):
def __init__(self, bus, tuned_interface):
super(Controller, self).__init__()
self._bus = bus
self._tuned_interface = tuned_interface
self._profile_holds = ProfileHoldManager(self)
self._performance_degraded = PerformanceDegraded.NONE
self._cmd = commands()
self._terminate = threading.Event()
self.load_config()
def _check_performance_degraded(self):
performance_degraded = PerformanceDegraded.NONE
if os.path.exists(NO_TURBO_PATH):
if int(self._cmd.read_file(NO_TURBO_PATH)) == 1:
performance_degraded = PerformanceDegraded.HIGH_OPERATING_TEMPERATURE
if os.path.exists(LAP_MODE_PATH):
if int(self._cmd.read_file(LAP_MODE_PATH)) == 1:
performance_degraded = PerformanceDegraded.LAP_DETECTED
if performance_degraded != self._performance_degraded:
log.info("Performance degraded: %s" % performance_degraded)
self._performance_degraded = performance_degraded
exports.property_changed("PerformanceDegraded", performance_degraded)
def run(self):
exports.start()
while not self._cmd.wait(self._terminate, 1):
self._check_performance_degraded()
exports.stop()
@property
def bus(self):
return self._bus
@property
def base_profile(self):
return self._base_profile
def terminate(self):
self._terminate.set()
def load_config(self):
self._config = PPDConfig(PPD_CONFIG_FILE)
self._base_profile = self._config.default_profile
self.switch_profile(self._config.default_profile)
def switch_profile(self, profile):
if self.active_profile() == profile:
return
tuned_profile = self._config.ppd_to_tuned[profile]
log.info("Switching to profile '%s'" % tuned_profile)
self._tuned_interface.switch_profile(tuned_profile)
exports.property_changed("ActiveProfile", profile)
def active_profile(self):
tuned_profile = self._tuned_interface.active_profile()
return self._config.tuned_to_ppd.get(tuned_profile, "unknown")
@exports.export("sss", "u")
def HoldProfile(self, profile, reason, app_id, caller):
if profile != PPD_POWER_SAVER and profile != PPD_PERFORMANCE:
raise dbus.exceptions.DBusException(
"Only '%s' and '%s' profiles may be held" % (PPD_POWER_SAVER, PPD_PERFORMANCE)
)
return self._profile_holds.add(profile, reason, app_id, caller)
@exports.export("u", "")
def ReleaseProfile(self, cookie, caller):
if not self._profile_holds.has(cookie):
raise dbus.exceptions.DBusException("No active hold for cookie '%s'" % cookie)
self._profile_holds.remove(cookie)
@exports.signal("u")
def ProfileReleased(self, cookie):
pass
@exports.property_setter("ActiveProfile")
def set_active_profile(self, profile):
if profile not in self._config.ppd_to_tuned:
raise dbus.exceptions.DBusException("Invalid profile '%s'" % profile)
self._base_profile = profile
self._profile_holds.clear()
self.switch_profile(profile)
@exports.property_getter("ActiveProfile")
def get_active_profile(self):
return self.active_profile()
@exports.property_getter("Profiles")
def get_profiles(self):
return dbus.Array(
[{"Profile": profile, "Driver": DRIVER} for profile in self._config.ppd_to_tuned.keys()],
signature="a{sv}",
)
@exports.property_getter("Actions")
def get_actions(self):
return dbus.Array([], signature="s")
@exports.property_getter("PerformanceDegraded")
def get_performance_degraded(self):
return self._performance_degraded
@exports.property_getter("ActiveProfileHolds")
def get_active_profile_holds(self):
return self._profile_holds.as_dbus_array()