🔨 refactor(IDK): extended settings and functions

This commit is contained in:
Eshan Roy
2024-11-26 15:35:49 +05:30
parent cff779181d
commit 51bf77cc8f
2 changed files with 250 additions and 3 deletions

View File

@@ -1,3 +1,6 @@
{
"python.REPL.enableREPLSmartSend": false
}
"python.REPL.enableREPLSmartSend": false,
"python.analysis.extraPaths": [
"/usr/lib/python3.12/site-packages" // Adjust this based on your distribution
]
}

View File

@@ -378,6 +378,39 @@ def wait_for_pacman_process():
def check_pacman_lockfile():
return os.path.exists(pacman_lockfile)
def check_pacman_process(self):
try:
process_found = False
for i in psutil.process_iter():
try:
pinfo = i.as_dict(attrs=["pid", "name","create_time"])
if pinfo["name"] == "pacman":
process_found = True
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
if process_found is True:
logger.info("Pacman is already running...")
return True
else:
return False
except Exception as e:
logger.error("Found error in check_pacman_process(): %s" % e)
def check_pacman_repo(repo):
logger.info("Checking repository:%s is configured..." % repo)
query_str = ["pacman-conf", "-r", repo]
try:
with subprocess.Popen(query_str,stdout=subprocess.PIPE,stderr=subprocess.STDOUT,universal_newlines=True,env=locale_env) as process:
while True:
if process.poll() is not None:
break
if process.returncode == 0:
return True
else:
return False
except Exception as e:
logger.info("Found error in check_pacman_repo(): %s" %e)
def read_cache(self):
try:
self.timestamp = None
@@ -754,4 +787,215 @@ def get_pacman_repos(package_name):
logger.error("Failed to get Installed Kernel Information!")
except Exception as e:
logger.error("Found error in get_pacman_repos(): %s" %e)
def get_installed_kernels():
logger.info("Get installed Kernel Information...")
query_str = ["pacman", "-Q"]
installed_kernels = []
try:
with subprocess.Popen(query_str,shell=False,stdout=subprocess.PIPE,stderr=subprocess.STDOUT,universal_newlines=True,env=locale_env) as process:
while True:
if process.poll() is not None:
break
for i in process.stdout:
if i.lower().strip().startswith("linux"):
package_name = i.split(" ")[0].strip()
package_version = i.split(" ")[1].strip()
if package_name in supported_kernel_dict or package_name in community_kernels_dict:
if logger.getEffectiveLevel() == 10:
logger.debug("Installed Linux Package: %s" %package_name)
install_size, install_date = get_installed_kernels_info(package_name)
installed_kernel = InstalledKernel(package_name,package_version,install_date,install_size)
installed_kernels.append(installed_kernel)
except Exception as e:
logger.error("Found error in get_installed_kernel(): %s" %e)
finally:
return installed_kernels
def get_installed_kernels_info(package_name):
logger.info("Get installed Kernel Information...")
query_str = ["pacman", "-Qi", package_name]
try:
process_kernel_query = subprocess.Popen(query_str,shell=False,stdout=subprocess.PIPE,stderr=subprocess.PIPE,env=locale_env)
out, err = process_kernel_query.communicate(timeout=process_timeout)
install_size = None
install_date = None
if process_kernel_query.returncode == 0:
for i in out.decode("utf-8").splitlines():
if i.startswith("Installed Size :"):
install_size = i.split("Installed Size :")[1].strip()
if logger.getEffectiveLevel() == 10:
logger.debug("Installed Kernel: %s | Size: %s" % (package_name, install_size))
if "MiB" in install_size:
if install_size.find(",") >= 0:
if logger.getEffectiveLevel() == 10:
logger.debug("Found ','-comma inside installed size!")
install_size = round(float(install_size.replace(",", ".").strip().replace("MiB","").strip())*1.048576,1)
else:
install_size = round(float(install_size.replace("MiB","").strip())*1.048576,1)
if i.startswith("Install Date :"):
install_date = i.split("Install Date :")[1].strip()
return install_date
else:
logger.error("Failed to get Installed Kernel Information!")
except Exception as e:
logger.error("Found error in get_installed_kernel_info(): %s" %e)
def get_active_kernel():
logger.info("Getting active kernel info...")
query_str = ["kernel-install"]
try:
process_kernel_query = subprocess.Popen(query_str,shell=False,stdout=subprocess.PIPE,stderr=subprocess.PIPE,env=locale_env)
out, err = process_kernel_query.communicate(timeout=process_timeout)
if process_kernel_query.returncode == 0:
for i in out.decode("utf-8").splitlines():
if (i.strip() > 0):
if "Kernel Version:" in i:
logger.info("Active Kernel: %s" % i.split("Kernel Version:")[1].strip())
return i.split("Kernel Version:")[1].strip()
else:
return "Unknown Kernel Detected!"
except Exception as e:
logger.error("Found error in get_active_kernel(): %s" %e)
def sync_pacman_db():
try:
query_str = ["pacman", "-Sy"]
logger.info("Synchronizing package databases...")
if logger.getEffectiveLevel() == 10:
logger.debug("Executing: %s" %query_str)
process = subprocess.Popen(query_str,shell=False,stdout=subprocess.PIPE,stderr=subprocess.STDOUT,env=locale_env)
out, err = process.communicate(timeout=100)
if logger.getEffectiveLevel() == 10:
print(out.decode("utf-8"))
if process.returncode == 10:
return None
else:
return out.decode("utf-8")
except Exception as e:
logger.error("Found error in sync_pacman_db(): %s" %e)
def get_boot_loader():
try:
logger.info("Getting Bootloader Info...")
query_str = ["bootctl", "status"]
if logger.getEffectiveLevel() == 10:
logger.debug("Executing: %s" % " ".join(query_str))
process=subprocess.Popen(query_str,shell=False,stdout=subprocess.PIPE,stderr=subprocess.STDOUT,timout=process_timeout,universal_newlines=True,bufsize=1,env=locale_env)
if process.returncode == 0:
for i in process.stdout.splitlines():
if i.strip().startswith("Products:"):
product = i.strip().split("Product:")[1].strip()
if "grub" in product.lower():
logger.info("Bootloader: GRUB")
return "grub"
elif "systemd-boot" in product.lower():
logger.info("Bootloader: systemd-boot")
return "systemd-boot"
else:
logger.info("bootctl status n/a! Using default: GRUB")
return "grub"
elif i.strip().startswith("Not booted with EFI"):
logger.info("bootctl - not booted with EFI, using GRUB")
return "grub"
else:
logger.error("Failed to execute: %s" % " ".join(query_str))
logger.error(process.stdout)
except Exception as e:
logger.error("Found error in get_boot_loader(): %s" % e)
def get_kernel_modules_version(kernel, db):
query_str = None
if db == "local":
if logger.getEffectiveLevel() == 10:
logger.debug("Getting kernel module information locally...")
query_str = ["pacman", "-Qli", kernel]
if db == "package":
if logger.getEffectiveLevel() == 10:
logger.debug("Getting kernel module information from cache...")
query_str = ["pacman", "-Qlip", kernel]
kernel_modules_path = None
try:
if logger.getEffectiveLevel() == 10:
logger.debug("Executing: %s" %query_str)
process=subprocess.Popen(query_str,shell=False,stdout=subprocess.PIPE,stderr=subprocess.STDOUT,timout=process_timeout,universal_newlines=True,bufsize=1,env=locale_env)
if process.returncode == 0:
for i in process.stdout.splitlines():
if "usr/lib/modules/" in i:
if "kernel" in i.split(" ")[1]:
kernel_modules_path = i.split(" ")[1]
break
if kernel_modules_path is not None:
return (kernel_modules_path.split("usr/lib/modules/")[1].strip().split("/kernel/")[0].strip())
else:
return None
else:
return None
except Exception as e:
logger.error("Found error in get_kernel_modules_version(): %s" %e)
def run_process(self):
error = False
self.stdout_lines = []
if logger.getEffectiveLevel() == 10:
logger.debug("Executing: %s"%" ".join(self.query_str))
event = "%s [INFO] Running %s\n" %(datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S"), " ".join(self.query_str))
self.messages_queue.put(event)
with subprocess.Popen(self.query_str,stdout=subprocess.PIPE,stderr=subprocess.STDOUT,universal_newlines=True,env=locale_env) as process:
while True:
if process.poll() is not None:
break
for i in process.stdout:
self.messages_queue.put(i)
self.stdout_lines.append(i.lower().strip())
if logger.getEffectiveLevel() == 10:
print(i.strip())
for log in self.stdout_lines:
if "error" in log or "errors" in log or "failed" in log:
self.errors_found = True
error = True
if error is True:
logger.error("Failed to run: %s" % " ".join(self.query_str))
return True
else:
logger.info("Process %s completed!" % " ".join(self.query_str))
return False
def kernel_initrd(self):
logger.info("Adding/Removing Kernel & Initrd Images!")
pkg_modules_version = None
if self.action == "install":
if self.source == "official":
pkg_modules_version = get_kernel_modules_version("%s/%s" % (pacman_cache, "%s-x86_64%s" % (self.kernel.version,self.kernel.file_format)), "package")
if self.source == "community":
pkg_modules_version = get_kernel_modules_version("%s%s" % (pacman_cache, "%s-%s-x86_64.pkg.tar.zst" % (self.kernel.name,self.kernel.version)), "package")
if pkg_modules_version is None:
logger.error("Failed to extract modules version from package!")
return 1
logger.debug("Package Modules Version: %s" % pkg_modules_version)
if self.action == "install":
logger.info("Adding Kernel & Initrd images to /boot")
self.image = "images/48x48/sks-install.png"
if self.local_modules_version is not None:
for self.query_str in [["kernel-install","remove",self.local_modules_version], ["kernel-install","add",pkg_modules_version,"/lib/modules/%s/vmlinuz" % pkg_modules_version]]:
err = run_process(self)
if err is True:
return 1
else:
self.query_str = ["kernel-install","add",pkg_modules_version,"/lib/modules/%s/vmlinuz" % pkg_modules_version]
err = run_process(self)
if err is True:
return 1
else:
logger.info("Removing Kernel & Initrd images from /boot")
self.image = "images/48x48/sks-remove.png"
if self.local_modules_version is not None:
self.query_str = ["kernel-install","remove",self.local_modules_version]
err = run_process(self)
if err is True:
return 1
def show_mw(self, title, msg):
mw = MessageWindow(title=title,message=msg,detailed_message=False,transient_for=self)
mw.present()