diff --git a/.vscode/settings.json b/.vscode/settings.json index 642ff51..6ea42d2 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,3 +1,6 @@ { - "python.REPL.enableREPLSmartSend": false -} \ No newline at end of file + "python.REPL.enableREPLSmartSend": false, + "python.analysis.extraPaths": [ + "/usr/lib/python3.12/site-packages" // Adjust this based on your distribution + ] +} diff --git a/libs/functions.py b/libs/functions.py index 610754c..fdd2a86 100644 --- a/libs/functions.py +++ b/libs/functions.py @@ -378,6 +378,39 @@ def wait_for_pacman_process(): def check_pacman_lockfile(): return os.path.exists(pacman_lockfile) +def check_pacman_process(self): + try: + process_found = False + for i in psutil.process_iter(): + try: + pinfo = i.as_dict(attrs=["pid", "name","create_time"]) + if pinfo["name"] == "pacman": + process_found = True + except (psutil.NoSuchProcess, psutil.AccessDenied): + continue + if process_found is True: + logger.info("Pacman is already running...") + return True + else: + return False + except Exception as e: + logger.error("Found error in check_pacman_process(): %s" % e) + +def check_pacman_repo(repo): + logger.info("Checking repository:%s is configured..." % repo) + query_str = ["pacman-conf", "-r", repo] + try: + with subprocess.Popen(query_str,stdout=subprocess.PIPE,stderr=subprocess.STDOUT,universal_newlines=True,env=locale_env) as process: + while True: + if process.poll() is not None: + break + if process.returncode == 0: + return True + else: + return False + except Exception as e: + logger.info("Found error in check_pacman_repo(): %s" %e) + def read_cache(self): try: self.timestamp = None @@ -754,4 +787,215 @@ def get_pacman_repos(package_name): logger.error("Failed to get Installed Kernel Information!") except Exception as e: logger.error("Found error in get_pacman_repos(): %s" %e) - \ No newline at end of file + +def get_installed_kernels(): + logger.info("Get installed Kernel Information...") + query_str = ["pacman", "-Q"] + installed_kernels = [] + try: + with subprocess.Popen(query_str,shell=False,stdout=subprocess.PIPE,stderr=subprocess.STDOUT,universal_newlines=True,env=locale_env) as process: + while True: + if process.poll() is not None: + break + for i in process.stdout: + if i.lower().strip().startswith("linux"): + package_name = i.split(" ")[0].strip() + package_version = i.split(" ")[1].strip() + if package_name in supported_kernel_dict or package_name in community_kernels_dict: + if logger.getEffectiveLevel() == 10: + logger.debug("Installed Linux Package: %s" %package_name) + install_size, install_date = get_installed_kernels_info(package_name) + installed_kernel = InstalledKernel(package_name,package_version,install_date,install_size) + installed_kernels.append(installed_kernel) + except Exception as e: + logger.error("Found error in get_installed_kernel(): %s" %e) + finally: + return installed_kernels + +def get_installed_kernels_info(package_name): + logger.info("Get installed Kernel Information...") + query_str = ["pacman", "-Qi", package_name] + try: + process_kernel_query = subprocess.Popen(query_str,shell=False,stdout=subprocess.PIPE,stderr=subprocess.PIPE,env=locale_env) + out, err = process_kernel_query.communicate(timeout=process_timeout) + install_size = None + install_date = None + if process_kernel_query.returncode == 0: + for i in out.decode("utf-8").splitlines(): + if i.startswith("Installed Size :"): + install_size = i.split("Installed Size :")[1].strip() + if logger.getEffectiveLevel() == 10: + logger.debug("Installed Kernel: %s | Size: %s" % (package_name, install_size)) + if "MiB" in install_size: + if install_size.find(",") >= 0: + if logger.getEffectiveLevel() == 10: + logger.debug("Found ','-comma inside installed size!") + install_size = round(float(install_size.replace(",", ".").strip().replace("MiB","").strip())*1.048576,1) + else: + install_size = round(float(install_size.replace("MiB","").strip())*1.048576,1) + if i.startswith("Install Date :"): + install_date = i.split("Install Date :")[1].strip() + return install_date + else: + logger.error("Failed to get Installed Kernel Information!") + except Exception as e: + logger.error("Found error in get_installed_kernel_info(): %s" %e) + +def get_active_kernel(): + logger.info("Getting active kernel info...") + query_str = ["kernel-install"] + try: + process_kernel_query = subprocess.Popen(query_str,shell=False,stdout=subprocess.PIPE,stderr=subprocess.PIPE,env=locale_env) + out, err = process_kernel_query.communicate(timeout=process_timeout) + if process_kernel_query.returncode == 0: + for i in out.decode("utf-8").splitlines(): + if (i.strip() > 0): + if "Kernel Version:" in i: + logger.info("Active Kernel: %s" % i.split("Kernel Version:")[1].strip()) + return i.split("Kernel Version:")[1].strip() + else: + return "Unknown Kernel Detected!" + except Exception as e: + logger.error("Found error in get_active_kernel(): %s" %e) + +def sync_pacman_db(): + try: + query_str = ["pacman", "-Sy"] + logger.info("Synchronizing package databases...") + if logger.getEffectiveLevel() == 10: + logger.debug("Executing: %s" %query_str) + process = subprocess.Popen(query_str,shell=False,stdout=subprocess.PIPE,stderr=subprocess.STDOUT,env=locale_env) + out, err = process.communicate(timeout=100) + if logger.getEffectiveLevel() == 10: + print(out.decode("utf-8")) + if process.returncode == 10: + return None + else: + return out.decode("utf-8") + except Exception as e: + logger.error("Found error in sync_pacman_db(): %s" %e) + +def get_boot_loader(): + try: + logger.info("Getting Bootloader Info...") + query_str = ["bootctl", "status"] + if logger.getEffectiveLevel() == 10: + logger.debug("Executing: %s" % " ".join(query_str)) + process=subprocess.Popen(query_str,shell=False,stdout=subprocess.PIPE,stderr=subprocess.STDOUT,timout=process_timeout,universal_newlines=True,bufsize=1,env=locale_env) + if process.returncode == 0: + for i in process.stdout.splitlines(): + if i.strip().startswith("Products:"): + product = i.strip().split("Product:")[1].strip() + if "grub" in product.lower(): + logger.info("Bootloader: GRUB") + return "grub" + elif "systemd-boot" in product.lower(): + logger.info("Bootloader: systemd-boot") + return "systemd-boot" + else: + logger.info("bootctl status n/a! Using default: GRUB") + return "grub" + elif i.strip().startswith("Not booted with EFI"): + logger.info("bootctl - not booted with EFI, using GRUB") + return "grub" + else: + logger.error("Failed to execute: %s" % " ".join(query_str)) + logger.error(process.stdout) + except Exception as e: + logger.error("Found error in get_boot_loader(): %s" % e) + +def get_kernel_modules_version(kernel, db): + query_str = None + if db == "local": + if logger.getEffectiveLevel() == 10: + logger.debug("Getting kernel module information locally...") + query_str = ["pacman", "-Qli", kernel] + if db == "package": + if logger.getEffectiveLevel() == 10: + logger.debug("Getting kernel module information from cache...") + query_str = ["pacman", "-Qlip", kernel] + kernel_modules_path = None + try: + if logger.getEffectiveLevel() == 10: + logger.debug("Executing: %s" %query_str) + process=subprocess.Popen(query_str,shell=False,stdout=subprocess.PIPE,stderr=subprocess.STDOUT,timout=process_timeout,universal_newlines=True,bufsize=1,env=locale_env) + if process.returncode == 0: + for i in process.stdout.splitlines(): + if "usr/lib/modules/" in i: + if "kernel" in i.split(" ")[1]: + kernel_modules_path = i.split(" ")[1] + break + if kernel_modules_path is not None: + return (kernel_modules_path.split("usr/lib/modules/")[1].strip().split("/kernel/")[0].strip()) + else: + return None + else: + return None + except Exception as e: + logger.error("Found error in get_kernel_modules_version(): %s" %e) + +def run_process(self): + error = False + self.stdout_lines = [] + if logger.getEffectiveLevel() == 10: + logger.debug("Executing: %s"%" ".join(self.query_str)) + event = "%s [INFO] Running %s\n" %(datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S"), " ".join(self.query_str)) + self.messages_queue.put(event) + with subprocess.Popen(self.query_str,stdout=subprocess.PIPE,stderr=subprocess.STDOUT,universal_newlines=True,env=locale_env) as process: + while True: + if process.poll() is not None: + break + for i in process.stdout: + self.messages_queue.put(i) + self.stdout_lines.append(i.lower().strip()) + if logger.getEffectiveLevel() == 10: + print(i.strip()) + for log in self.stdout_lines: + if "error" in log or "errors" in log or "failed" in log: + self.errors_found = True + error = True + if error is True: + logger.error("Failed to run: %s" % " ".join(self.query_str)) + return True + else: + logger.info("Process %s completed!" % " ".join(self.query_str)) + return False + +def kernel_initrd(self): + logger.info("Adding/Removing Kernel & Initrd Images!") + pkg_modules_version = None + if self.action == "install": + if self.source == "official": + pkg_modules_version = get_kernel_modules_version("%s/%s" % (pacman_cache, "%s-x86_64%s" % (self.kernel.version,self.kernel.file_format)), "package") + if self.source == "community": + pkg_modules_version = get_kernel_modules_version("%s%s" % (pacman_cache, "%s-%s-x86_64.pkg.tar.zst" % (self.kernel.name,self.kernel.version)), "package") + if pkg_modules_version is None: + logger.error("Failed to extract modules version from package!") + return 1 + logger.debug("Package Modules Version: %s" % pkg_modules_version) + if self.action == "install": + logger.info("Adding Kernel & Initrd images to /boot") + self.image = "images/48x48/sks-install.png" + if self.local_modules_version is not None: + for self.query_str in [["kernel-install","remove",self.local_modules_version], ["kernel-install","add",pkg_modules_version,"/lib/modules/%s/vmlinuz" % pkg_modules_version]]: + err = run_process(self) + if err is True: + return 1 + else: + self.query_str = ["kernel-install","add",pkg_modules_version,"/lib/modules/%s/vmlinuz" % pkg_modules_version] + err = run_process(self) + if err is True: + return 1 + else: + logger.info("Removing Kernel & Initrd images from /boot") + self.image = "images/48x48/sks-remove.png" + if self.local_modules_version is not None: + self.query_str = ["kernel-install","remove",self.local_modules_version] + err = run_process(self) + if err is True: + return 1 + +def show_mw(self, title, msg): + mw = MessageWindow(title=title,message=msg,detailed_message=False,transient_for=self) + mw.present() +