From 3fc33d1d1269e7473b746f5bf585a12c63b062af Mon Sep 17 00:00:00 2001 From: Eshan Roy Date: Mon, 25 Nov 2024 09:58:27 +0530 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=A8=20refactor(=5Fconstruct):=20extend?= =?UTF-8?q?ed=20construction?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- libs/functions.py | 119 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 115 insertions(+), 4 deletions(-) diff --git a/libs/functions.py b/libs/functions.py index 7079df8..d03b2c8 100644 --- a/libs/functions.py +++ b/libs/functions.py @@ -12,15 +12,15 @@ import subprocess import distro import requests import threading +from threading import Thread import pathlib -import queue from logging.handlers import TimedRotatingFileHandler import tomlkit import shutil import time - +import re import Kernel - +from queue import Queue import gi gi.require_version("Gtk", "3.0") # GTK 2.0 is dead! from gi.repository import GLib @@ -423,4 +423,115 @@ def read_cache(self): except Exception as e: logger.error("Found error in read_cache() %s" %e) - \ No newline at end of file +def get_latest_versions(self): + logger.info("Fetching Latest Kernel Information") + kernel_versions = {} + try: + for i in supported_kernel_dict: + check_cmd_str = ["pacman", "-Si", i] + with subprocess.Popen(check_cmd_str, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,bufsize=1,universal_newlines=True,env=locale_env) as process: + while True: + if process.poll() is not None: + break + for line in process.stdout: + if line.strip().replace(" ", "").startswith("Version:"): + kernel_versions[i] = (line.strip().replace(" ", "").split("Version:")[1]) + break + self.kernel_version_queue.put(kernel_versions) + except Exception as e: + logger.error("Found error in get_latest_versions() %s" %e) + +def parse_archive_html(response, linux_kernel): + for line in response.splitlines(): + if " 0: + if "-x86_64" in files[0]: + version = files[0].split("-x86_64")[0] + file_format = files[04].split("-x86_64")[1] + url = ("/packages/l/%s" % archlinux_mirror_archive_url + "/%s" % linux_kernel + "/%s" % files[0]) #URL Struct unknown to me! + if ".sig" not in file_format: + if len(line.rstrip().split(" ")) > 0: + size = line.strip().split(" ").pop().strip() + last_modified = line.strip().split("").pop() + for i in last_modified.split(" "): + if len(i.strip()) > 0 and ":" in i.strip(): + last_modified = i.strip() + + headers = "%s%s" %(supported_kernel_dict[linux_kernel][1], version.replace(linux_kernel, "")) + if (version is not None and url is not None and headers is not None and file_format == ".pkg.tar.zst" and datetime.datetime.now().year - datetime.datetime.strptime(last_modified, "%d-%b-%Y %H:%M").year <= 2): + ke = Kernel(linux_kernel,headers,version,size,last_modified,file_format) + fetched_kernels_dict[version] = ke + version = None + file_format = None + url = None + size = None + last_modified = None + +def wait_for_response(response_queue): + while True: + items = response_queue.get() + if items is None: + break + if len(supported_kernel_dict) == len(items): + break + +def get_response(session, linux_kernel, response_queue, response_content): + response = session.get("%s/packages/l/%s" %(archlinux_mirror_archive_url, linux_kernel), headers=headers, allow_redirects=True, timout=60, stream=True) + if response.status_code == 200: + if logger.getEffectiveLevel() == 10: + logger.debug("Response Code for %s/packages/l/%s = 200 (OK)" %(archlinux_mirror_archive_url, linux_kernel)) + if response.text is not None: + response_content[linux_kernel] = response.text + response_queue.put(response_content) + else: + logger.error("Failed To Process Request! Someting Went Wrong!") + logger.error(response.text) + response_queue.put(None) + +def get_official_kernels(self): + try: + if not os.path.exists(cache_file) or self.refresh_cache is True: + session = requests.session() + response_queue = Queue() + response_content = {} + for linux_kernel in supported_kernel_dict: + logger.info("Fetching data: %s/packages/l/%s" %(archlinux_mirror_archive_url, linux_kernel)) + Thread(target=get_response, args=(session,linux_kernel,response_queue,response_content), daemon=True).start() + wait_for_response(response_queue) + session.close() + for kernel in response_content: + parse_archive_html(response_content[kernel], kernel) + if len(fetched_kernels_dict) > 0: + write_cache() + read_cache() + self.queue_kernels.put(cached_kernel_list) + else: + logger.error("Failed to fetch Kernel List!") + self.queue_kernels.put(None) + else: + logger.debug("Reading Cache File: %s" % cache_file) + read_cache(self) + self.queue_kernels.put(cached_kernel_list) + except Exception as e: + logger.error("Found error in get_official_kernels() %s" %e) + +def wait_for_cache(self): + while True: + if not os.path.exists(cache_file): + time.sleep(0.2) + else: + read_cache(self) + break + +def is_thread_alive(thread_name): + for thead in threading.enumerate(): + if thead.name == thread_name and thead.is_alive(): + return True + return False + +def print_all_threads(): + for thread in threading.enumerate(): + if logger.getEffectiveLevel() == 10: + logger.debug("Thread: %s and State: %s" %(thread.name, thread.is_alive())) +