#!/usr/bin/env python import glob import logging import re import sys from time import sleep import yaml from simple_pid import PID SYSFS_HWMON_BASE = "/sys/class/hwmon/" class ThermalZone: def __init__(self, config, pyfan_parent) -> None: self.fans = config["fan"] self.temp_source = config["source"] self.factor = 1 / config["factor"] self.name = config["name"] self.target = config["target"] self.pyfan = pyfan_parent self.hwmap = self.pyfan.hwmap self.alias_replace = re.compile("|".join(self.hwmap.keys())) if "interval" not in config: config["interval"] = 3 logging.getLogger("pyfan").warning( "[%s] No interval specified, using default. This is deprecated since 1.6 and may be removed in future " "versions. See example config for reference.", self.name, ) self.pid = PID( config["pid"]["p"], config["pid"]["i"], config["pid"]["d"], setpoint=0, sample_time=config["interval"], ) # we prefer 3 for pwm_enable, but since some chips do not support this mode, use 1 with pwm_min of 1 as fallback # more on pwm=0 behaviour: https://www.kernel.org/doc/html/latest/hwmon/pwm-fan.html self.pid.output_limits = (1, 255) self.setup_pwm() logging.getLogger("pyfan").info( "[%s] Source=%s Fans=%s Factor=%f %s", self.name, self.temp_source, self.fans, self.factor, self.pid, ) def eval(self): if self.get_temp(): diff = self.target - self.get_temp() val = int(self.pid(diff)) try: for target_fan in self.fans: if isinstance(target_fan, dict): fan = list(target_fan.keys())[0] fan_val = list(target_fan.values())[0] if isinstance(fan_val, list): if len(fan_val) < 2: logging.getLogger("pyfan").warning( "[%s] max/min for %s was not set correctly (%s)", self.name, fan, fan_val, ) if self.read_sysfs(fan) != min( fan_val[1], max(val, fan_val[0]) ): self.write_sysfs( fan, min(fan_val[1], max(val, fan_val[0])) ) elif self.read_sysfs(fan) != min(val, fan_val): self.write_sysfs(fan, min(val, fan_val)) logging.getLogger("pyfan").debug( "[%s] %s=%i%%", self.name, fan, int(int(self.read_sysfs(fan)) / 255 * 100), ) elif self.read_sysfs(target_fan) != val: self.write_sysfs(target_fan, val) except OSError as err: logging.getLogger("pyfan").warning( "[%s] Failed to set pwm, trying to reset it. (%s)", self.name, err.strerror, ) self.setup_pwm(1) logging.getLogger("pyfan").debug( "[%s] %i%% D:%iC T:%iC %s", self.name, int(val / 255 * 100), diff, self.get_temp(), self.pid, ) def get_temp(self): if isinstance(self.temp_source, list): max_temp = -1.0 for fan in self.temp_source: if self.read_sysfs(fan): max_temp = max(float(self.read_sysfs(fan)) * self.factor, max_temp) return max_temp else: if self.read_sysfs(self.temp_source): return float(self.read_sysfs(self.temp_source)) * self.factor else: return None def restore(self): self.setup_pwm(2) def setup_pwm(self, value=1): for target_fan in self.fans: try: if isinstance(target_fan, dict): self.set_pwm_mode(list(target_fan.keys())[0], value) else: self.set_pwm_mode(target_fan, value) except FileNotFoundError: logging.getLogger("pyfan").warning( "[%s] pwm not found. Try reloading hwmon map...", self.name ) self.hwmap = self.pyfan.hwmap def replace_alias(self, path): replaced = self.alias_replace.sub(lambda x: self.hwmap[x.group()], path) logging.getLogger("pyfan").debug("[ALIAS] %s -> %s", path, replaced) return replaced def build_pwm_path(self, specific): return self.replace_alias(SYSFS_HWMON_BASE + specific) def write_sysfs(self, path, value): with open(self.build_pwm_path(path), "w") as sysfs_f: sysfs_f.write(str(value)) def read_sysfs(self, path): try: with open(self.build_pwm_path(path)) as sysfs_f: return sysfs_f.readline() except FileNotFoundError as err: logging.getLogger("pyfan").warning( "[%s] temp source not found. Not ready yet or wrong path? (%s)", self.name, err.strerror, ) return None def set_pwm_mode(self, path, value=1): self.write_sysfs(path + "_enable", value) class PyFan: def __init__(self, config="/etc/pyfan") -> None: self.config = self.__load_config(config) logging.basicConfig(level=logging.getLevelName(self.config["loglevel"])) self.zones = [] if "pid_interval" not in self.config: self.interval = 0.2 logging.getLogger("pyfan").warning( "No pid_interval specified, using default. This is deprecated since 1.6 and may be removed in future " "versions. See example config for reference." ) else: self.interval = self.config["pid_interval"] for zone in self.config["thermalzones"]: self.zones.append(ThermalZone(zone, self)) logging.getLogger("pyfan").info( "Created %d thermal zones, pid_interval=%f.", len(self.zones), self.interval ) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): for zone in self.zones: zone.restore() def eval(self): for zone in self.zones: zone.eval() @property def hwmap(self): hwmon_map = {} names = glob.glob(SYSFS_HWMON_BASE + "hwmon*/name") for name in names: hwmon = name.split("/")[-2] with open(name) as file: hw_name = file.read().strip() hwmon_map[hw_name] = hwmon return hwmon_map @staticmethod def __load_config(path): with open(path) as cfg_file: return yaml.safe_load(cfg_file) if __name__ == "__main__": with PyFan() as pyfan: while True: try: pyfan.eval() sleep(pyfan.interval) except KeyboardInterrupt: sys.exit(0)