From 27af03ae9fb44ea728d24d26faa0c0192f7177d7 Mon Sep 17 00:00:00 2001 From: Eshan Roy Date: Mon, 25 Nov 2024 07:07:01 +0530 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=A8=20refactor(=5Fconstruct):=20still?= =?UTF-8?q?=20under=20construction?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .vscode/settings.json | 3 + libs/functions.py | 166 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 168 insertions(+), 1 deletion(-) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..642ff51 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "python.REPL.enableREPLSmartSend": false +} \ No newline at end of file diff --git a/libs/functions.py b/libs/functions.py index e2677e9..0a194f6 100644 --- a/libs/functions.py +++ b/libs/functions.py @@ -13,6 +13,9 @@ import requests import threading import pathlib import queue +from logging.handlers import TimedRotatingFileHandler +import tomlkit +import shutil import gi gi.require_version("Gtk", "3.0") # GTK 2.0 is dead! @@ -151,4 +154,165 @@ def get_latest_kernel_updates(self): refresh_cache(self) return True else: - logger.info \ No newline at end of file + logger.info("Linux Kernel Update Failed!") + return False + else: + logger.error("Failed to get Response Code!") + logger.error(response.text) + return False + else: + logger.info("Update Check Not Required!") + return False + else: + logger.info("No Cache File Preset at the Moment!") + if not os.path.exists(cache_update): + last_update_check = datetime.datetime.now().strftime("%Y-%m-%d") + with open(cache_update, mode="w", encoding="utf-8") as f: + f.write("%s\n" % last_update_check) + permissions(cache_dir) + except Exception as e: + logger.error("Found error in get_latest_kernel_updates() %s" % e) + return True + +def get_cache_last_modified(): + try: + if os.path.exists(cache_file): + timestamp = datetime.datetime.fromtimestamp(pathlib.Path(cache_file).stat().st_mtime, tz=datetime.timezone.utc) + return "%s %s" %(timestamp.date(), str(timestamp.time()).split(".")[0]) + else: + return "Cache File Does not Exist!" + except Exception as e: + logger.error("Found Error in get_cache_last_modified() %s" % e) + +try: + if not os.path.exists(log_dir): + makedirs(log_dir) +except Exception as e: + logger.error("Found Error while creating log_dir %s" % e) + +tfh = TimedRotatingFileHandler(event_log_file, encoding="utf-8", delay=False, when="W4") +tfh.setFormatter(formatter) +logger.addHandler(tfh) + +def setup_config(self): + try: + if not os.path.exists(config_dir): + makedirs(config_dir) + if not os.path.exists(config_file): + # makedirs(config_file) + shutil.copy(config_file_default, config_dir) + permissions(config_dir) + return read_config(self) + except Exception as e: + logger.error("Found error in setup_config() %s" % e) + +def update_config(config_data, bootloader): + try: + logger.info("Update Configuration Data...") + with open(config_file, "w") as f: + tomlkit.dump(config_data, f) + return True + except Exception as e: + logger.error("Found error in update_config() %s" % e) + +def read_config(self): + try: + logger.info("Reading config file: %s" %config_file) + config_data = None + with open(config_file, "rb") as f: + config_data = tomlkit.load(f) + if (config_data.get("kernels") and "official" in config_data["kernels"] is not None): + for i in config_data["kernels"]["official"]: + supported_kernel_dict[i["name"]] = (i["description"], i["headers"]) + if (config_data.get("kernels") and "community" in config_data["kernels"] is not None): + for j in config_data["kernels"]["community"]: + community_kernels_dict[j["name"]] = (j["description"], j["headers"], j["repository"]) + if (config_data.get("logging") is not None and "loglevel" in config_data["logging"] is not None): + loglevel = config_data["logging"]["loglevel"].lower() + logger.info("Setting loglevel: %s" % loglevel) + if loglevel == "debug": + logger.setLevel(logging.DEBUG) + elif loglevel == "info": + logger.setLevel(logging.INFO) + else: + logger.warning("Invalid loglevel found! Available: Info/Debug") + logger.setLevel(logging.INFO) + else: + logger.setLevel(logging.INFO) + return config_data + except Exception as e: + logger.error("Found error in read_config() %s" % e) + sys.exit(1) + +def create_cache_dir(): + try: + if not os.path.exists(cache_dir): + makedirs(cache_dir) + logger.info("Cache Directory: %s" % cache_dir) + permissions(cache_dir) + except Exception as e: + logger.error("Found error in create_cache_dir() %s" %e) + +def create_log_dir(): + try: + if not os.path.exists(log_dir): + makedirs(log_dir) + logger.info("Log Directory: %s" % log_dir) + except Exception as e: + logger.error("Found error in create_log_dir() %s" %e) + +def install_archive_kernel(self): + try: + logger.debug("Cleaning pacman cache and removing official packages...") + if os.path.exists(pacman_cache): + for root, dirs, files in os.walk(pacman_cache): + for name in files: + for official_kernels in supported_kernel_dict.keys(): + if name.startswith(official_kernels): + if os.path.exists(os.path.join(root, name)): + os.remove(os.path.join(root, name)) + + install_cmd_str = ["pacman", "-U", self.official_kernels[0], self.official_kernels[1], "--noconfirm", "--needed"] + # Need to wait for process + wait_for_pacman_process() + + if logger.getEffectiveLevel() == 10: + logger.debug("Running %s" % install_cmd_str) + event = "%s [INFO] Running %s\n" %(datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S"), " ".join(install_cmd_str)) + error = False + self.messages_queue.put(event) + with subprocess.Popen(install_cmd_str, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True,env=locale_env) as process: + while True: + if process.poll() is not None: + break + for line in process.stdout: + if logger.getEffectiveLevel() == 10: + print(line.strip()) + self.messages_queue.put(line) + if "no space left on device" in line.lower().strip(): + self.restore_kernel = None + error = True + break + if "initcpio" in line.lower().strip(): + if "image generation successfull" in line.lower().strip(): + error = False + break + if ("installation finished, no error reported" in line.lower().strip()): + error = False + break + if "error" in line.lower().strip() or "errors" in line.lower().strip(): + error = True + break + if error is True: + self.errors_found = True + error = True + GLib.idle_add( + show_mw, + self, + "System changes", + f"kernel {self.action} failed!\n" + f"There have been errors, please review the log", + priority=GLib.PRIORITY_DEFAULT, + ) + +def wait_for_pacman_process(): \ No newline at end of file