diff --git a/usr/share/blackbox/Functions.py b/usr/share/blackbox/Functions.py
new file mode 100644
index 0000000..2fba411
--- /dev/null
+++ b/usr/share/blackbox/Functions.py
@@ -0,0 +1,2789 @@
+# =================================================================
+# = Author: Cameron Percival =
+# =================================================================
+
+import os
+import sys
+import psutil
+import time
+import datetime
+from datetime import datetime, timedelta
+import subprocess
+import threading
+import gi
+import logging
+from logging.handlers import TimedRotatingFileHandler
+import shutil
+from threading import Thread
+from Package import Package
+from Settings import Settings
+from ui.MessageDialog import MessageDialog
+from distro import id
+from os import makedirs
+
+gi.require_version("Gtk", "3.0")
+from gi.repository import GLib, Gtk
+
+# =====================================================
+# Base Directory
+# =====================================================
+
+base_dir = os.path.dirname(os.path.realpath(__file__))
+
+# =====================================================
+# Global Variables
+# =====================================================
+sudo_username = os.getlogin()
+home = "/home/" + str(sudo_username)
+path_dir_cache = base_dir + "/cache/"
+packages = []
+distr = id()
+sofirem_lockfile = "/tmp/sofirem.lock"
+sofirem_pidfile = "/tmp/sofirem.pid"
+# 10m timeout
+process_timeout = 600
+
+arcolinux_mirrorlist = "/etc/pacman.d/arcolinux-mirrorlist"
+pacman_conf = "/etc/pacman.conf"
+pacman_conf_backup = "/etc/pacman.conf.bak"
+pacman_logfile = "/var/log/pacman.log"
+pacman_lockfile = "/var/lib/pacman/db.lck"
+pacman_cache_dir = "/var/cache/pacman/pkg/"
+
+arco_test_repo = [
+ "#[arcolinux_repo_testing]",
+ "#SigLevel = PackageRequired DatabaseNever",
+ "#Include = /etc/pacman.d/arcolinux-mirrorlist",
+]
+
+arco_repo = [
+ "[arcolinux_repo]",
+ "SigLevel = PackageRequired DatabaseNever",
+ "Include = /etc/pacman.d/arcolinux-mirrorlist",
+]
+
+arco_3rd_party_repo = [
+ "[arcolinux_repo_3party]",
+ "SigLevel = PackageRequired DatabaseNever",
+ "Include = /etc/pacman.d/arcolinux-mirrorlist",
+]
+
+arco_xlrepo = [
+ "[arcolinux_repo_xlarge]",
+ "SigLevel = PackageRequired DatabaseNever",
+ "Include = /etc/pacman.d/arcolinux-mirrorlist",
+]
+
+
+log_dir = "/var/log/sofirem/"
+config_dir = "%s/.config/sofirem" % home
+config_file = "%s/sofirem.yaml" % config_dir
+
+
+event_log_file = "%s/event.log" % log_dir
+
+export_dir = "%s/sofirem-exports" % home
+
+
+# =====================================================
+# PERMISSIONS
+# =====================================================
+
+
+def permissions(dst):
+ try:
+ groups = subprocess.run(
+ ["sh", "-c", "id " + sudo_username],
+ shell=False,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ )
+ for x in groups.stdout.decode().split(" "):
+ if "gid" in x:
+ g = x.split("(")[1]
+ group = g.replace(")", "").strip()
+ subprocess.call(["chown", "-R", sudo_username + ":" + group, dst], shell=False)
+
+ except Exception as e:
+ logger.error(e)
+
+
+# Create log, export, conf directory
+try:
+ if not os.path.exists(log_dir):
+ makedirs(log_dir)
+
+ if not os.path.exists(export_dir):
+ makedirs(export_dir)
+
+ if not os.path.exists(config_dir):
+ makedirs(config_dir)
+
+ permissions(export_dir)
+ permissions(config_dir)
+
+ print("[INFO] Log directory = %s" % log_dir)
+ print("[INFO] Export directory = %s" % export_dir)
+ print("[INFO] Config directory = %s" % config_dir)
+
+
+except os.error as oe:
+ print("[ERROR] Exception in setup log/export directory: %s" % oe)
+ sys.exit(1)
+
+# read in conf file from $HOME/.config/sofirem/sofirem.yaml
+# initialize logger
+try:
+ settings = Settings(False, False)
+ settings_config = settings.read_config_file()
+
+ logger = logging.getLogger("logger")
+
+ # create console handler and set level to debug
+ ch = logging.StreamHandler()
+
+ # rotate the events log every Friday
+ tfh = TimedRotatingFileHandler(
+ event_log_file, encoding="utf-8", delay=False, when="W4"
+ )
+
+ if settings_config:
+ debug_logging_enabled = None
+ debug_logging_enabled = settings_config["Debug Logging"]
+
+ if debug_logging_enabled is not None and debug_logging_enabled is True:
+ logger.setLevel(logging.DEBUG)
+ ch.setLevel(logging.DEBUG)
+ tfh.setLevel(level=logging.DEBUG)
+
+ else:
+ logger.setLevel(logging.INFO)
+ ch.setLevel(logging.INFO)
+ tfh.setLevel(level=logging.INFO)
+ else:
+ logger.setLevel(logging.INFO)
+ ch.setLevel(logging.INFO)
+ tfh.setLevel(level=logging.INFO)
+
+ # create formatter
+ formatter = logging.Formatter(
+ "%(asctime)s:%(levelname)s > %(message)s", "%Y-%m-%d %H:%M:%S"
+ )
+ # add formatter to ch
+ ch.setFormatter(formatter)
+ tfh.setFormatter(formatter)
+
+ # add ch to logger
+ logger.addHandler(ch)
+
+ # add fh to logger
+ logger.addHandler(tfh)
+
+except Exception as e:
+ print("[ERROR] Failed to setup logger, exception: %s" % e)
+
+
+# on app close create file of installed packages
+def _on_close_create_packages_file():
+ try:
+ logger.info("App closing saving currently installed packages to file")
+ packages_file = "%s-packages.txt" % datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
+ logger.info("Saving in %s%s" % (log_dir, packages_file))
+ cmd = ["pacman", "-Q"]
+
+ with subprocess.Popen(
+ cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ bufsize=1,
+ universal_newlines=True,
+ ) as process:
+ with open("%s/%s" % (log_dir, packages_file), "w") as f:
+ for line in process.stdout:
+ f.write("%s" % line)
+ except Exception as e:
+ logger.error("Exception in on_close_create_packages_file(): %s" % e)
+
+
+# =====================================================
+# GLOBAL FUNCTIONS
+# =====================================================
+
+
+def _get_position(lists, value):
+ data = [string for string in lists if value in string]
+ position = lists.index(data[0])
+ return position
+
+
+def is_file_stale(filepath, stale_days, stale_hours, stale_minutes):
+ # first, lets obtain the datetime of the day that we determine data to be "stale"
+ now = datetime.now()
+ # For the purposes of this, we are assuming that one would have the app open longer than 5 minutes if installing.
+ stale_datetime = now - timedelta(
+ days=stale_days, hours=stale_hours, minutes=stale_minutes
+ )
+ # Check to see if the file path is in existence.
+ if os.path.exists(filepath):
+ # if the file exists, when was it made?
+ file_created = datetime.fromtimestamp(os.path.getctime(filepath))
+ # file is older than the time delta identified above
+ if file_created < stale_datetime:
+ return True
+ return False
+
+
+# =====================================================
+# PACMAN SYNC PACKAGE DB
+# =====================================================
+def sync_package_db():
+ try:
+ sync_str = ["pacman", "-Sy"]
+ logger.info("Synchronizing pacman package databases")
+ process_sync = subprocess.run(
+ sync_str,
+ shell=False,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ timeout=process_timeout,
+ )
+
+ if process_sync.returncode == 0:
+ return None
+ else:
+ if process_sync.stdout:
+ out = str(process_sync.stdout.decode("utf-8"))
+ logger.error(out)
+
+ return out
+
+ except Exception as e:
+ logger.error("Exception in sync_package_db(): %s" % e)
+
+
+# =====================================================
+# PACMAN SYNC FILES DB
+# =====================================================
+
+
+def sync_file_db():
+ try:
+ sync_str = ["pacman", "-Fy"]
+ logger.info("Synchronizing pacman file database")
+ process_sync = subprocess.run(
+ sync_str,
+ shell=False,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ timeout=process_timeout,
+ )
+
+ if process_sync.returncode == 0:
+ return None
+ else:
+ if process_sync.stdout:
+ out = str(process_sync.stdout.decode("utf-8"))
+ logger.error(out)
+
+ return out
+
+ except Exception as e:
+ logger.error("Exception in sync_file_db(): %s" % e)
+
+
+# =====================================================
+# PACMAN INSTALL/UNINSTALL PROCESS
+# =====================================================
+
+
+# this is run inside a separate thread
+def start_subprocess(self, cmd, progress_dialog, action, pkg, widget):
+ try:
+ self.switch_package_version.set_sensitive(False)
+ self.switch_arco_keyring.set_sensitive(False)
+ self.switch_arco_mirrorlist.set_sensitive(False)
+
+ widget.set_sensitive(False)
+
+ # store process std out into a list, if there are errors display to user once the process completes
+ process_stdout_lst = []
+ process_stdout_lst.append("Command = %s\n\n" % " ".join(cmd))
+
+ with subprocess.Popen(
+ cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ bufsize=1,
+ universal_newlines=True,
+ ) as process:
+ if progress_dialog is not None:
+ progress_dialog.pkg_dialog_closed = False
+
+ self.in_progress = True
+
+ if (
+ progress_dialog is not None
+ and progress_dialog.pkg_dialog_closed is False
+ ):
+ line = (
+ "Pacman is processing the %s of package %s \n\nCommand running = %s\n\n"
+ % (action, pkg.name, " ".join(cmd))
+ )
+ GLib.idle_add(
+ update_progress_textview,
+ self,
+ line,
+ progress_dialog,
+ priority=GLib.PRIORITY_DEFAULT,
+ )
+
+ logger.debug("Pacman is now processing the request")
+
+ # poll for the process to complete
+ # read stdout as it comes in, update the progress textview
+
+ # poll() Check if child process has terminated.
+ # Set and return returncode attribute. Otherwise, returns None.
+
+ while True:
+ if process.poll() is not None:
+ break
+
+ if (
+ progress_dialog is not None
+ and progress_dialog.pkg_dialog_closed is False
+ ):
+ for line in process.stdout:
+ GLib.idle_add(
+ update_progress_textview,
+ self,
+ line,
+ progress_dialog,
+ priority=GLib.PRIORITY_DEFAULT,
+ )
+ process_stdout_lst.append(line)
+
+ time.sleep(0.3)
+ else:
+ # increase wait time to reduce cpu load, no textview updates required since dialog is closed
+ # since the progress dialog window is closed, capture errors and then later display it
+ for line in process.stdout:
+ process_stdout_lst.append(line)
+ time.sleep(1)
+
+ returncode = None
+ returncode = process.poll()
+
+ # logger.debug("Pacman process return code = %s" % returncode)
+
+ if returncode is not None:
+ logger.info("Pacman process completed running = [ %s ]" % " ".join(cmd))
+
+ GLib.idle_add(
+ refresh_ui,
+ self,
+ action,
+ widget,
+ pkg,
+ progress_dialog,
+ process_stdout_lst,
+ priority=GLib.PRIORITY_DEFAULT,
+ )
+ else:
+ # an error happened during the pacman transaction
+ logger.error(
+ "Pacman process failed when running = [ %s ]" % " ".join(cmd)
+ )
+
+ except TimeoutError as t:
+ logger.error("TimeoutError in %s start_subprocess(): %s" % (action, t))
+ process.terminate()
+ if progress_dialog is not None:
+ progress_dialog.btn_package_progress_close.set_sensitive(True)
+ self.switch_package_version.set_sensitive(True)
+ self.switch_arco_keyring.set_sensitive(True)
+ self.switch_arco_mirrorlist.set_sensitive(True)
+
+ except SystemError as s:
+ logger.error("SystemError in %s start_subprocess(): %s" % (action, s))
+ process.terminate()
+ if progress_dialog is not None:
+ progress_dialog.btn_package_progress_close.set_sensitive(True)
+ self.switch_package_version.set_sensitive(True)
+ self.switch_arco_keyring.set_sensitive(True)
+ self.switch_arco_mirrorlist.set_sensitive(True)
+
+
+# refresh ui components, once the process completes
+# show notification dialog to user if errors are encountered during package install/uninstall
+def refresh_ui(self, action, switch, pkg, progress_dialog, process_stdout_lst):
+ self.switch_package_version.set_sensitive(True)
+ self.switch_arco_keyring.set_sensitive(True)
+ self.switch_arco_mirrorlist.set_sensitive(True)
+
+ logger.debug("Checking if package %s is installed" % pkg.name)
+ installed = check_package_installed(pkg.name)
+
+ if progress_dialog is not None:
+ progress_dialog.btn_package_progress_close.set_sensitive(True)
+
+ if installed is True and action == "install":
+ logger.debug("Toggle switch state = True")
+ switch.set_sensitive(True)
+ switch.set_state(True)
+ switch.set_active(True)
+
+ if progress_dialog is not None:
+ if progress_dialog.pkg_dialog_closed is False:
+ progress_dialog.set_title("Package install for %s completed" % pkg.name)
+
+ progress_dialog.infobar.set_name("infobar_info")
+
+ content = progress_dialog.infobar.get_content_area()
+ if content is not None:
+ for widget in content.get_children():
+ content.remove(widget)
+
+ lbl_install = Gtk.Label(xalign=0, yalign=0)
+ lbl_install.set_markup("Package %s installed" % pkg.name)
+
+ content.add(lbl_install)
+
+ if self.timeout_id is not None:
+ GLib.source_remove(self.timeout_id)
+ self.timeout_id = None
+
+ self.timeout_id = GLib.timeout_add(
+ 100, reveal_infobar, self, progress_dialog
+ )
+
+ if installed is False and action == "install":
+ logger.debug("Toggle switch state = False")
+
+ if progress_dialog is not None:
+ # install failed/terminated
+ switch.set_sensitive(True)
+ switch.set_state(False)
+ switch.set_active(False)
+
+ if progress_dialog.pkg_dialog_closed is False:
+ progress_dialog.set_title("Package install for %s failed" % pkg.name)
+
+ progress_dialog.infobar.set_name("infobar_error")
+
+ content = progress_dialog.infobar.get_content_area()
+ if content is not None:
+ for widget in content.get_children():
+ content.remove(widget)
+
+ lbl_install = Gtk.Label(xalign=0, yalign=0)
+ lbl_install.set_markup(
+ "Package %s install failed" % pkg.name
+ )
+
+ content.add(lbl_install)
+
+ if self.timeout_id is not None:
+ GLib.source_remove(self.timeout_id)
+ self.timeout_id = None
+
+ self.timeout_id = GLib.timeout_add(
+ 100, reveal_infobar, self, progress_dialog
+ )
+ else:
+ logger.debug(" ".join(process_stdout_lst))
+
+ message_dialog = MessageDialog(
+ "Errors occurred during install",
+ "Errors occurred install for %s failed" % pkg.name,
+ "Pacman failed to install package %s\n" % pkg.name,
+ " ".join(process_stdout_lst),
+ "error",
+ True,
+ )
+
+ message_dialog.show_all()
+ result = message_dialog.run()
+ message_dialog.destroy()
+ elif progress_dialog is None or progress_dialog.pkg_dialog_closed is True:
+ # the package progress dialog has been closed, but notify user package failed to install
+
+ if (
+ "error: failed to init transaction (unable to lock database)\n"
+ in process_stdout_lst
+ ):
+ # at this point the package install is stuck behind another pacman transaction so put this onto the holding queue
+
+ # logger.debug(" ".join(process_stdout_lst))
+
+ if progress_dialog is None:
+ logger.debug("Adding package to holding queue")
+ if self.display_package_progress is False:
+ inst_str = [
+ "pacman",
+ "-S",
+ pkg.name,
+ "--needed",
+ "--noconfirm",
+ ]
+ self.pkg_holding_queue.put(
+ (
+ pkg,
+ action,
+ switch,
+ inst_str,
+ progress_dialog,
+ ),
+ )
+ else:
+ logger.debug(" ".join(process_stdout_lst))
+ switch.set_sensitive(True)
+ switch.set_state(False)
+ switch.set_active(False)
+
+ proc = fn.get_pacman_process()
+
+ message_dialog = MessageDialog(
+ "Warning",
+ "Sofirem cannot proceed pacman lockfile found",
+ "Pacman cannot lock the db, a lockfile is found inside %s"
+ % fn.pacman_lockfile,
+ "Pacman is running: %s" % proc,
+ "warning",
+ False,
+ )
+
+ message_dialog.show_all()
+ result = message_dialog.run()
+ message_dialog.destroy()
+
+ # progress dialog is closed show message dialog with error
+ elif "error: target not found: %s\n" % pkg.name in process_stdout_lst:
+ switch.set_sensitive(True)
+ switch.set_state(False)
+ switch.set_active(False)
+
+ message_dialog = MessageDialog(
+ "Error",
+ "Pacman repository error: package '%s' was not found" % pkg.name,
+ "Sofirem cannot process the request",
+ "Are the correct pacman mirrorlists configured ?",
+ "error",
+ False,
+ )
+
+ message_dialog.show_all()
+ result = message_dialog.run()
+ message_dialog.destroy()
+
+ else:
+ # logger.debug(" ".join(process_stdout_lst))
+
+ switch.set_sensitive(True)
+ switch.set_state(False)
+ switch.set_active(False)
+
+ message_dialog = MessageDialog(
+ "Errors occurred during install",
+ "Errors occurred install for %s failed" % pkg.name,
+ "Pacman failed to install package %s\n" % pkg.name,
+ " ".join(process_stdout_lst),
+ "error",
+ True,
+ )
+
+ message_dialog.show_all()
+ result = message_dialog.run()
+ message_dialog.destroy()
+
+ if installed is False and action == "uninstall":
+ logger.debug("Toggle switch state = False")
+ switch.set_sensitive(True)
+ switch.set_state(False)
+ switch.set_active(False)
+
+ if progress_dialog is not None:
+ if progress_dialog.pkg_dialog_closed is False:
+ progress_dialog.set_title(
+ "Package uninstall for %s completed" % pkg.name
+ )
+ progress_dialog.infobar.set_name("infobar_info")
+ content = progress_dialog.infobar.get_content_area()
+ if content is not None:
+ for widget in content.get_children():
+ content.remove(widget)
+
+ lbl_install = Gtk.Label(xalign=0, yalign=0)
+ lbl_install.set_markup("Package %s uninstalled" % pkg.name)
+
+ content.add(lbl_install)
+
+ if self.timeout_id is not None:
+ GLib.source_remove(self.timeout_id)
+ self.timeout_id = None
+
+ self.timeout_id = GLib.timeout_add(
+ 100, reveal_infobar, self, progress_dialog
+ )
+
+ if installed is True and action == "uninstall":
+ # uninstall failed/terminated
+ switch.set_sensitive(True)
+ switch.set_state(True)
+ switch.set_active(True)
+
+ if progress_dialog is not None:
+ if progress_dialog.pkg_dialog_closed is False:
+ progress_dialog.set_title("Package uninstall for %s failed" % pkg.name)
+ progress_dialog.infobar.set_name("infobar_error")
+
+ content = progress_dialog.infobar.get_content_area()
+ if content is not None:
+ for widget in content.get_children():
+ content.remove(widget)
+
+ lbl_install = Gtk.Label(xalign=0, yalign=0)
+ lbl_install.set_markup(
+ "Package %s uninstall failed" % pkg.name
+ )
+
+ content.add(lbl_install)
+
+ if self.timeout_id is not None:
+ GLib.source_remove(self.timeout_id)
+ self.timeout_id = None
+
+ self.timeout_id = GLib.timeout_add(
+ 100, reveal_infobar, self, progress_dialog
+ )
+
+ elif progress_dialog is None or progress_dialog.pkg_dialog_closed is True:
+ # the package progress dialog has been closed, but notify user package failed to uninstall
+
+ if (
+ "error: failed to init transaction (unable to lock database)\n"
+ in process_stdout_lst
+ ):
+ logger.error(" ".join(process_stdout_lst))
+
+ else:
+ message_dialog = MessageDialog(
+ "Errors occurred during uninstall",
+ "Errors occurred uninstall of %s failed" % pkg.name,
+ "Pacman failed to uninstall package %s\n" % pkg.name,
+ " ".join(process_stdout_lst),
+ "error",
+ True,
+ )
+
+ message_dialog.show_all()
+ result = message_dialog.run()
+ message_dialog.destroy()
+
+
+# update progress textview using stdout from the pacman process running
+
+
+def update_progress_textview(self, line, progress_dialog):
+ if (
+ progress_dialog is not None
+ and progress_dialog.pkg_dialog_closed is False
+ and self.in_progress is True
+ ):
+ buffer = progress_dialog.package_progress_textview.get_buffer()
+ if len(line) > 0 or buffer is None:
+ buffer.insert(buffer.get_end_iter(), "%s" % line, len("%s" % line))
+
+ text_mark_end = buffer.create_mark("\nend", buffer.get_end_iter(), False)
+
+ # scroll to the end of the textview
+ progress_dialog.package_progress_textview.scroll_mark_onscreen(
+ text_mark_end
+ )
+ else:
+ # dialog window is closed
+ line = None
+ return False
+
+
+# =====================================================
+# APP INSTALLATION
+# =====================================================
+def install(self):
+ pkg, action, widget, inst_str, progress_dialog = self.pkg_queue.get()
+
+ try:
+ if action == "install":
+ # path = base_dir + "/cache/installed.lst"
+ logger.debug("Running inside install thread")
+ logger.info("Installing package %s" % pkg.name)
+ logger.debug(inst_str)
+
+ # run pacman process inside a thread
+
+ th_subprocess_install = Thread(
+ name="thread_subprocess",
+ target=start_subprocess,
+ args=(
+ self,
+ inst_str,
+ progress_dialog,
+ action,
+ pkg,
+ widget,
+ ),
+ daemon=True,
+ )
+
+ th_subprocess_install.start()
+
+ logger.debug("Thread: subprocess install started")
+
+ except Exception as e:
+ # deactivate switch widget, install failed
+ widget.set_state(False)
+ if progress_dialog is not None:
+ progress_dialog.btn_package_progress_close.set_sensitive(True)
+ logger.error("Exception in install(): %s" % e)
+ finally:
+ # task completed
+ self.pkg_queue.task_done()
+
+
+# =====================================================
+# APP UNINSTALLATION
+# =====================================================
+def uninstall(self):
+ pkg, action, widget, uninst_str, progress_dialog = self.pkg_queue.get()
+
+ try:
+ if action == "uninstall":
+ # path = base_dir + "/cache/installed.lst"
+ logger.debug("Running inside uninstall thread")
+ logger.info("Uninstalling package %s" % pkg.name)
+ logger.debug(uninst_str)
+
+ # run pacman process inside a thread
+
+ th_subprocess_uninstall = Thread(
+ name="thread_subprocess",
+ target=start_subprocess,
+ args=(
+ self,
+ uninst_str,
+ progress_dialog,
+ action,
+ pkg,
+ widget,
+ ),
+ daemon=True,
+ )
+
+ # is there a pacman lockfile, wait for it before uninstalling
+ # while check_pacman_lockfile():
+ # time.sleep(0.2)
+
+ th_subprocess_uninstall.start()
+
+ logger.debug("Thread: subprocess uninstall started")
+
+ except Exception as e:
+ widget.set_state(True)
+ if progress_dialog is not None:
+ progress_dialog.btn_package_progress_close.set_sensitive(True)
+ logger.error("Exception in uninstall(): %s" % e)
+ finally:
+ self.pkg_queue.task_done()
+
+
+# =====================================================
+# SEARCH INDEXING
+# =====================================================
+
+
+# store a list of package metadata into memory for fast retrieval
+def store_packages():
+ path = base_dir + "/yaml/"
+ cache = base_dir + "/cache/yaml-packages.lst"
+ yaml_files = []
+ packages = []
+
+ category_dict = {}
+
+ try:
+ # get latest package version info
+
+ package_metadata = get_all_package_info()
+
+ # get a list of yaml files
+ for file in os.listdir(path):
+ if file.endswith(".yaml"):
+ yaml_files.append(file)
+
+ if len(yaml_files) > 0:
+ for yaml_file in yaml_files:
+ cat_desc = ""
+ package_name = ""
+ package_cat = ""
+
+ category_name = yaml_file[11:-5].strip().capitalize()
+
+ # read contents of each yaml file
+
+ with open(path + yaml_file, "r") as yaml:
+ content = yaml.readlines()
+ for line in content:
+ if line.startswith(" packages:"):
+ continue
+ elif line.startswith(" description: "):
+ # Set the label text for the description line
+ subcat_desc = (
+ line.strip(" description: ")
+ .strip()
+ .strip('"')
+ .strip("\n")
+ .strip()
+ )
+ elif line.startswith("- name:"):
+ # category
+
+ subcat_name = (
+ line.strip("- name: ")
+ .strip()
+ .strip('"')
+ .strip("\n")
+ .strip()
+ )
+ elif line.startswith(" - "):
+ # add the package to the packages list
+
+ package_name = line.strip(" - ").strip()
+
+ # get the package description
+ # package_desc = obtain_pkg_description(package_name)
+
+ package_version = "Unknown"
+ package_description = "Unknown"
+
+ for data in package_metadata:
+ if data["name"] == package_name:
+ package_version = data["version"]
+ package_description = data["description"]
+
+ break
+
+ if package_description == "Unknown":
+ package_description = obtain_pkg_description(package_name)
+
+ package = Package(
+ package_name,
+ package_description,
+ category_name,
+ subcat_name,
+ subcat_desc,
+ package_version,
+ )
+
+ packages.append(package)
+
+ # filter the results so that each category holds a list of package
+
+ category_name = None
+ packages_cat_lst = []
+ for pkg in packages:
+ if category_name == pkg.category:
+ packages_cat_lst.append(pkg)
+ category_dict[category_name] = packages_cat_lst
+ elif category_name is None:
+ packages_cat_lst.append(pkg)
+ category_dict[pkg.category] = packages_cat_lst
+ else:
+ # reset packages, new category
+ packages_cat_lst = []
+
+ packages_cat_lst.append(pkg)
+
+ category_dict[pkg.category] = packages_cat_lst
+
+ category_name = pkg.category
+
+ # Print dictionary for debugging
+
+ # for key in category_dict.keys():
+ # print("Category = %s" % key)
+ # pkg_list = category_dict[key]
+ #
+ # for pkg in pkg_list:
+ # print("%s" % pkg.name)
+
+ sorted_dict = None
+
+ sorted_dict = dict(sorted(category_dict.items()))
+
+ if sorted_dict is None:
+ logger.error(
+ "An error occurred during sort of stored packages in store_packages()"
+ )
+ else:
+ with open(cache, "w", encoding="UTF-8") as f:
+ for key in category_dict.keys():
+ pkg_list = category_dict[key]
+
+ for pkg in pkg_list:
+ f.write("%s\n" % pkg.name)
+
+ return sorted_dict
+ except Exception as e:
+ logger.error("Exception in store_packages() : %s" % e)
+ sys.exit(1)
+
+
+def get_package_description(package):
+ query_str = ["pacman", "-Si", package]
+
+ try:
+ with subprocess.Popen(
+ query_str,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ bufsize=1,
+ universal_newlines=True,
+ ) as process:
+ while True:
+ if process.poll() is not None:
+ break
+
+ returncode = None
+ returncode = process.poll()
+
+ if returncode is not None:
+ for line in process.stdout:
+ if "Description :" in line.strip():
+ return line.replace(" ", "").split("Description:")[1].strip()
+
+ except Exception as e:
+ logger.error("Exception in get_package_description(): %s" % e)
+
+
+# =====================================================
+# PACKAGE VERSIONS
+# =====================================================
+
+
+# get live package name, version info and repo name
+# used in store_packages() and get_installed_package_data()
+def get_all_package_info():
+ query_str = ["pacman", "-Si"]
+
+ try:
+ process_pkg_query = subprocess.Popen(
+ query_str, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE
+ )
+
+ out, err = process_pkg_query.communicate(timeout=process_timeout)
+
+ if process_pkg_query.returncode == 0:
+ if out:
+ package_data = []
+ package_name = "Unknown"
+ package_version = "Unknown"
+ package_description = "Unknown"
+ package_repository = "Unknown"
+
+ for line in out.decode("utf-8").splitlines():
+ package_dict = {}
+ if "Name :" in line.strip():
+ package_name = line.replace(" ", "").split("Name:")[1].strip()
+
+ if "Version :" in line.strip():
+ package_version = (
+ line.replace(" ", "").split("Version:")[1].strip()
+ )
+
+ if "Description :" in line.strip():
+ package_description = line.split("Description :")[1].strip()
+
+ if "Repository :" in line.strip():
+ package_repository = line.split("Repository :")[1].strip()
+
+ package_dict["name"] = package_name
+ package_dict["version"] = package_version
+ package_dict["description"] = package_description
+ package_dict["repository"] = package_repository
+
+ package_data.append(package_dict)
+
+ return package_data
+ else:
+ logger.error("Failed to extract package version information.")
+
+ except Exception as e:
+ logger.error("Exception in get_all_package_info() : %s" % e)
+
+
+# get installed package version, installed date, name to be displayed inside PackageListDialog
+# for export and later import
+def get_installed_package_data(self):
+ # to capture the latest package version
+ latest_package_data = get_all_package_info()
+
+ # query_str = ["pacman", "-Qi"]
+ # query_str = ["pacman", "-Qien"]
+
+ try:
+ installed_packages_list = []
+ pkg_name = None
+ pkg_version = None
+ pkg_install_date = None
+ pkg_installed_size = None
+ pkg_latest_version = None
+
+ with subprocess.Popen(
+ self.pacman_export_cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ bufsize=1,
+ universal_newlines=True,
+ ) as process:
+ for line in process.stdout:
+ if "Name :" in line.strip():
+ pkg_name = line.replace(" ", "").split("Name:")[1].strip()
+
+ if "Version :" in line.strip():
+ pkg_version = line.replace(" ", "").split("Version:")[1].strip()
+
+ if "Installed Size :" in line.strip():
+ pkg_installed_size = line.split("Installed Size :")[1].strip()
+
+ if "Install Date :" in line.strip():
+ pkg_install_date = line.split("Install Date :")[1].strip()
+
+ # get the latest version lookup dictionary
+
+ found = False
+ pkg_latest_version = None
+
+ for i in latest_package_data:
+ if i["name"] == pkg_name:
+ pkg_latest_version = i["version"]
+ break
+
+ installed_packages_list.append(
+ (
+ pkg_name,
+ pkg_version,
+ pkg_latest_version,
+ pkg_installed_size,
+ pkg_install_date,
+ )
+ )
+
+ self.pkg_export_queue.put(installed_packages_list)
+
+ # return installed_packages_list
+
+ except Exception as e:
+ logger.error("Exception in get_installed_package_data() : %s" % e)
+
+
+# get list of files installed by a package
+def get_package_files(package_name):
+ try:
+ query_str = ["pacman", "-Fl", package_name]
+ process = subprocess.run(
+ query_str,
+ shell=False,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ timeout=process_timeout,
+ )
+
+ if process.returncode == 0:
+ package_files = []
+ for line in process.stdout.decode("utf-8").splitlines():
+ package_files.append((line.split(" ")[1], None))
+
+ return package_files
+ else:
+ return None
+ except Exception as e:
+ logger.error("Exception in get_package_files(): %s" % e)
+
+
+# get key package information which is to be shown inside ProgressDialog
+
+
+def get_package_information(package_name):
+ logger.info("Fetching package information for %s" % package_name)
+
+ try:
+ pkg_name = "Unknown"
+ pkg_version = "Unknown"
+ pkg_repository = "Unknown / pacman mirrorlist not configured"
+ pkg_description = "Unknown"
+ pkg_arch = "Unknown"
+ pkg_url = "Unknown"
+ pkg_depends_on = []
+ pkg_conflicts_with = []
+ pkg_download_size = "Unknown"
+ pkg_installed_size = "Unknown"
+ pkg_build_date = "Unknown"
+ pkg_packager = "Unknown"
+ package_metadata = {}
+
+ query_local_cmd = ["pacman", "-Qi", package_name]
+ query_remote_cmd = ["pacman", "-Si", package_name]
+
+ process_query_remote = subprocess.run(
+ query_remote_cmd,
+ shell=False,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ timeout=process_timeout,
+ )
+
+ # added validation on process result
+ if process_query_remote.returncode == 0:
+ for line in process_query_remote.stdout.decode("utf-8").splitlines():
+ if "Name :" in line.strip():
+ pkg_name = line.replace(" ", "").split("Name:")[1].strip()
+
+ if "Version :" in line.strip():
+ pkg_version = line.replace(" ", "").split("Version:")[1].strip()
+
+ if "Repository :" in line.strip():
+ pkg_repository = line.split("Repository :")[1].strip()
+
+ if "Description :" in line.strip():
+ pkg_description = line.split("Description :")[1].strip()
+
+ if "Architecture :" in line.strip():
+ pkg_arch = line.split("Architecture :")[1].strip()
+
+ if "URL :" in line.strip():
+ pkg_url = line.split("URL :")[1].strip()
+
+ if "Depends On :" in line.strip():
+ if line.split("Depends On :")[1].strip() != "None":
+ pkg_depends_on_str = line.split("Depends On :")[1].strip()
+
+ for pkg_dep in pkg_depends_on_str.split(" "):
+ pkg_depends_on.append((pkg_dep, None))
+ else:
+ pkg_depends_on = []
+
+ if "Conflicts With :" in line.strip():
+ if line.split("Conflicts With :")[1].strip() != "None":
+ pkg_conflicts_with_str = line.split("Conflicts With :")[
+ 1
+ ].strip()
+
+ for pkg_con in pkg_conflicts_with_str.split(" "):
+ pkg_conflicts_with.append((pkg_con, None))
+ else:
+ pkg_conflicts_with = []
+
+ if "Download Size :" in line.strip():
+ pkg_download_size = line.split("Download Size :")[1].strip()
+
+ if "Installed Size :" in line.strip():
+ pkg_installed_size = line.split("Installed Size :")[1].strip()
+
+ if "Build Date :" in line.strip():
+ pkg_build_date = line.split("Build Date :")[1].strip()
+
+ if "Packager :" in line.strip():
+ pkg_packager = line.split("Packager :")[1].strip()
+
+ package_metadata["name"] = pkg_name
+ package_metadata["version"] = pkg_version
+ package_metadata["repository"] = pkg_repository
+ package_metadata["description"] = pkg_description
+ package_metadata["arch"] = pkg_arch
+ package_metadata["url"] = pkg_url
+ package_metadata["depends_on"] = pkg_depends_on
+ package_metadata["conflicts_with"] = pkg_conflicts_with
+ package_metadata["download_size"] = pkg_download_size
+ package_metadata["installed_size"] = pkg_installed_size
+ package_metadata["build_date"] = pkg_build_date
+ package_metadata["packager"] = pkg_packager
+
+ return package_metadata
+
+ elif (
+ "error: package '%s' was not found\n" % package_name
+ in process_query_remote.stdout.decode("utf-8")
+ ):
+ return "error: package '%s' was not found" % package_name
+ else:
+ process_query_local = subprocess.run(
+ query_local_cmd,
+ shell=False,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ timeout=process_timeout,
+ )
+
+ # added validation on process result
+ if process_query_local.returncode == 0:
+ for line in process_query_local.stdout.decode("utf-8").splitlines():
+ if "Name :" in line.strip():
+ pkg_name = line.replace(" ", "").split("Name:")[1].strip()
+
+ if "Version :" in line.strip():
+ pkg_version = line.replace(" ", "").split("Version:")[1].strip()
+
+ if "Repository :" in line.strip():
+ pkg_repository = line.split("Repository :")[1].strip()
+
+ if "Description :" in line.strip():
+ pkg_description = line.split("Description :")[1].strip()
+
+ if "Architecture :" in line.strip():
+ pkg_arch = line.split("Architecture :")[1].strip()
+
+ if "URL :" in line.strip():
+ pkg_url = line.split("URL :")[1].strip()
+
+ if "Depends On :" in line.strip():
+ if line.split("Depends On :")[1].strip() != "None":
+ pkg_depends_on_str = line.split("Depends On :")[
+ 1
+ ].strip()
+
+ for pkg_dep in pkg_depends_on_str.split(" "):
+ pkg_depends_on.append((pkg_dep, None))
+ else:
+ pkg_depends_on = []
+
+ if "Conflicts With :" in line.strip():
+ if line.split("Conflicts With :")[1].strip() != "None":
+ pkg_conflicts_with_str = line.split("Conflicts With :")[
+ 1
+ ].strip()
+
+ for pkg_con in pkg_conflicts_with_str.split(" "):
+ pkg_conflicts_with.append((pkg_con, None))
+ else:
+ pkg_conflicts_with = []
+
+ if "Download Size :" in line.strip():
+ pkg_download_size = line.split("Download Size :")[1].strip()
+
+ if "Installed Size :" in line.strip():
+ pkg_installed_size = line.split("Installed Size :")[1].strip()
+
+ if "Build Date :" in line.strip():
+ pkg_build_date = line.split("Build Date :")[1].strip()
+
+ if "Packager :" in line.strip():
+ pkg_packager = line.split("Packager :")[1].strip()
+
+ package_metadata["name"] = pkg_name
+ package_metadata["version"] = pkg_version
+ package_metadata["repository"] = pkg_repository
+ package_metadata["description"] = pkg_description
+ package_metadata["arch"] = pkg_arch
+ package_metadata["url"] = pkg_url
+ package_metadata["depends_on"] = pkg_depends_on
+ package_metadata["conflicts_with"] = pkg_conflicts_with
+ package_metadata["download_size"] = pkg_download_size
+ package_metadata["installed_size"] = pkg_installed_size
+ package_metadata["build_date"] = pkg_build_date
+ package_metadata["packager"] = pkg_packager
+
+ return package_metadata
+ else:
+ return None
+ except Exception as e:
+ logger.error("Exception in get_package_information(): %s" % e)
+
+
+# =====================================================
+# APP QUERY
+# =====================================================
+
+
+def get_current_installed():
+ logger.debug("Get currently installed packages")
+ path = base_dir + "/cache/installed.lst"
+
+ # query_str = "pacman -Q > " + path
+ query_str = ["pacman", "-Q"]
+
+ # run the query - using Popen because it actually suits this use case a bit better.
+
+ subprocess_query = subprocess.Popen(
+ query_str,
+ shell=False,
+ stdout=subprocess.PIPE,
+ )
+
+ out, err = subprocess_query.communicate(timeout=process_timeout)
+
+ # added validation on process result
+ if subprocess_query.returncode == 0:
+ file = open(path, "w")
+ for line in out.decode("utf-8"):
+ file.write(line)
+ file.close()
+ else:
+ logger.warning("Failed to run %s" % query_str)
+
+
+def query_pkg(package):
+ try:
+ package = package.strip()
+ path = base_dir + "/cache/installed.lst"
+
+ pacman_localdb = base_dir + "/cache/pacman-localdb"
+
+ if os.path.exists(path):
+ if is_file_stale(path, 0, 0, 30):
+ get_current_installed()
+ # file does NOT exist;
+ else:
+ get_current_installed()
+ # then, open the resulting list in read mode
+ with open(path, "r") as f:
+ # first we need to strip the new line escape sequence to ensure we don't get incorrect outcome
+ pkg = package.strip("\n")
+
+ # If the pkg name appears in the list, then it is installed
+ for line in f:
+ installed = line.split(" ")
+ # We only compare against the name of the package, NOT the version number.
+ if pkg == installed[0]:
+ # file.close()
+ return True
+ # We will only hit here, if the pkg does not match anything in the file.
+ # file.close()
+
+ return False
+ except Exception as e:
+ logger.error("Exception in query_pkg(): %s " % e)
+
+
+# =====================================================
+# PACKAGE DESCRIPTION CACHE AND SEARCH
+# =====================================================
+
+
+def cache(package, path_dir_cache):
+ try:
+ # first we need to strip the new line escape sequence to ensure we don't get incorrect outcome
+ pkg = package.strip()
+ # create the query
+ query_str = ["pacman", "-Si", pkg, " --noconfirm"]
+
+ # run the query - using Popen because it actually suits this use case a bit better.
+
+ process = subprocess.Popen(
+ query_str, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE
+ )
+ out, err = process.communicate()
+
+ # validate the process result
+ if process.returncode == 0:
+ # out, err = process.communicate()
+
+ output = out.decode("utf-8")
+
+ if len(output) > 0:
+ split = output.splitlines()
+
+ # Currently the output of the pacman command above always puts the description on the 4th line.
+ desc = str(split[3])
+ # Ok, so this is a little fancy: there is formatting from the output which we wish to ignore (ends at 19th character)
+ # and there is a remenant of it as the last character - usually a single or double quotation mark, which we also need to ignore
+ description = desc[18:]
+ # writing to a caching file with filename matching the package name
+ filename = path_dir_cache + pkg
+
+ file = open(filename, "w")
+ file.write(description)
+ file.close()
+
+ return description
+ # There are several packages that do not return a valid process return code
+ # Cathing those manually via corrections folder
+ if process.returncode != 0:
+ exceptions = [
+ "florence",
+ "mintstick-bin",
+ "arcolinux-conky-collection-plasma-git",
+ "arcolinux-desktop-trasher-git",
+ "arcolinux-pamac-all",
+ "arcolinux-sddm-simplicity-git",
+ "ttf-hack",
+ "ttf-roboto-mono",
+ "aisleriot",
+ "mailspring",
+ "linux-rt",
+ "linux-rt-headers",
+ "linux-rt-lts",
+ "linux-rt-lts-headers",
+ "arcolinux-sddm-simplicity-git",
+ "kodi-x11",
+ "kodi-addons",
+ "sardi-icons",
+ ]
+ if pkg in exceptions:
+ description = file_lookup(pkg, path_dir_cache + "corrections/")
+ return description
+ return "No Description Found"
+
+ except Exception as e:
+ logger.error("Exception in cache(): %s " % e)
+
+
+# Creating an over-load so that we can use the same function, with slightly different code to get the results we need
+def cache_btn():
+ # fraction = 1 / len(packages)
+ # Non Multithreaded version.
+ packages.sort()
+ number = 1
+ for pkg in packages:
+ logger.debug(str(number) + "/" + str(len(packages)) + ": Caching " + pkg)
+ cache(pkg, path_dir_cache)
+ number = number + 1
+ # progressbar.timeout_id = GLib.timeout_add(50, progressbar.update, fraction)
+
+ logger.debug("Caching applications finished")
+
+ # This will need to be coded to be running multiple processes eventually, since it will be manually invoked.
+ # process the file list
+ # for each file in the list, open the file
+ # process the file ignoring what is not what we need
+ # for each file line processed, we need to invoke the cache function that is not over-ridden.
+
+
+def file_lookup(package, path):
+ # first we need to strip the new line escape sequence to ensure we don't get incorrect outcome
+ pkg = package.strip("\n")
+ output = ""
+ if os.path.exists(path + "corrections/" + pkg):
+ filename = path + "corrections/" + pkg
+ else:
+ filename = path + pkg
+ file = open(filename, "r")
+ output = file.read()
+ file.close()
+ if len(output) > 0:
+ return output
+ return "No Description Found"
+
+
+def obtain_pkg_description(package):
+ # This is a pretty simple function now, decide how to get the information, then get it.
+ # processing variables.
+ output = ""
+ path = base_dir + "/cache/"
+
+ # First we need to determine whether to pull from cache or pacman.
+ if os.path.exists(path + package.strip("\n")):
+ output = file_lookup(package, path)
+
+ # file doesn't exist, so create a blank copy
+ else:
+ output = cache(package, path)
+ # Add the package in question to the global variable, in case recache is needed
+ packages.append(package)
+ return output
+
+
+def restart_program():
+ os.unlink("/tmp/sofirem.lock")
+ python = sys.executable
+ os.execl(python, python, *sys.argv)
+
+
+# =====================================================
+# MONITOR PACMAN LOG FILE
+# =====================================================
+
+
+# write lines from the pacman log onto a queue, this is called from a non-blocking thread
+def add_pacmanlog_queue(self):
+ try:
+ lines = []
+ with open(pacman_logfile, "r", encoding="utf-8") as f:
+ while True:
+ line = f.readline()
+ if line:
+ lines.append(line.encode("utf-8"))
+ self.pacmanlog_queue.put(lines)
+ else:
+ time.sleep(0.5)
+
+ except Exception as e:
+ logger.error("Exception in add_pacmanlog_queue() : %s" % e)
+ finally:
+ logger.debug("No new lines found inside the pacman log file")
+
+
+# start log timer to update the textview called from a non-blocking thread
+def start_log_timer(self, window_pacmanlog):
+ while True:
+ if window_pacmanlog.start_logtimer is False:
+ logger.debug("Stopping Pacman log monitoring timer")
+ return False
+
+ GLib.idle_add(update_textview_pacmanlog, self, priority=GLib.PRIORITY_DEFAULT)
+ time.sleep(2)
+
+
+# update the textview component with new lines from the pacman log file
+
+
+# To fix: Gtk-CRITICAL **: gtk_text_buffer_emit_insert: assertion 'g_utf8_validate (text, len, NULL)' failed
+# Make sure the line read from the pacman log file is encoded in utf-8
+# Then decode the line when inserting inside the buffer
+
+
+def update_textview_pacmanlog(self):
+ lines = self.pacmanlog_queue.get()
+
+ try:
+ buffer = self.textbuffer_pacmanlog
+ if len(lines) > 0:
+ end_iter = buffer.get_end_iter()
+ for line in lines:
+ if len(line) > 0:
+ buffer.insert(
+ end_iter,
+ line.decode("utf-8"),
+ len(line),
+ )
+
+ except Exception as e:
+ logger.error("Exception in update_textview_pacmanlog() : %s" % e)
+ finally:
+ self.pacmanlog_queue.task_done()
+
+ if len(lines) > 0:
+ text_mark_end = buffer.create_mark("end", buffer.get_end_iter(), False)
+ # auto-scroll the textview to the bottom as new content is added
+
+ self.textview_pacmanlog.scroll_mark_onscreen(text_mark_end)
+
+ lines.clear()
+
+
+# =====================================================
+# USER SEARCH
+# =====================================================
+
+
+def search(self, term):
+ try:
+ logger.info('Searching for: "%s"' % term)
+
+ pkg_matches = []
+
+ category_dict = {}
+
+ whitespace = False
+
+ if term.strip():
+ whitespace = True
+
+ for pkg_list in self.packages.values():
+ for pkg in pkg_list:
+ if whitespace:
+ for te in term.split(" "):
+ if (
+ te.lower() in pkg.name.lower()
+ or te.lower() in pkg.description.lower()
+ ):
+ # only unique name matches
+ if pkg not in pkg_matches:
+ pkg_matches.append(
+ pkg,
+ )
+ else:
+ if (
+ term.lower() in pkg.name.lower()
+ or term.lower() in pkg.description.lower()
+ ):
+ pkg_matches.append(
+ pkg,
+ )
+
+ # filter the results so that each category holds a list of package
+
+ category_name = None
+ packages_cat = []
+ for pkg_match in pkg_matches:
+ if category_name == pkg_match.category:
+ packages_cat.append(pkg_match)
+ category_dict[category_name] = packages_cat
+ elif category_name is None:
+ packages_cat.append(pkg_match)
+ category_dict[pkg_match.category] = packages_cat
+ else:
+ # reset packages, new category
+ packages_cat = []
+
+ packages_cat.append(pkg_match)
+
+ category_dict[pkg_match.category] = packages_cat
+
+ category_name = pkg_match.category
+
+ # debug console output to display package info
+ """
+ # print out number of results found from each category
+ print("[DEBUG] %s Search results.." % datetime.now().strftime("%H:%M:%S"))
+
+ for category in sorted(category_dict):
+ category_res_len = len(category_dict[category])
+ print("[DEBUG] %s %s = %s" %(
+ datetime.now().strftime("%H:%M:%S"),
+ category,
+ category_res_len,
+ )
+ )
+ """
+
+ # sort dictionary so the category names are displayed in alphabetical order
+ sorted_dict = None
+
+ if len(category_dict) > 0:
+ sorted_dict = dict(sorted(category_dict.items()))
+ self.search_queue.put(
+ sorted_dict,
+ )
+ else:
+ self.search_queue.put(
+ None,
+ )
+
+ except Exception as e:
+ logger.error("Exception in search(): %s", e)
+
+
+# =====================================================
+# ARCOLINUX REPOS, KEYS AND MIRRORS
+# =====================================================
+
+
+def append_repo(text):
+ """Append a new repo"""
+ try:
+ with open(pacman_conf, "a", encoding="utf-8") as f:
+ f.write("\n\n")
+ f.write(text)
+ except Exception as e:
+ logger.error("Exception in append_repo(): %s" % e)
+
+
+def repo_exist(value):
+ """check repo_exists"""
+ with open(pacman_conf, "r", encoding="utf-8") as f:
+ lines = f.readlines()
+ f.close()
+
+ for line in lines:
+ if value in line:
+ return True
+ return False
+
+
+def install_arco_keyring():
+ try:
+ keyring = base_dir + "/packages/arcolinux-keyring/"
+ file = os.listdir(keyring)
+ cmd_str = [
+ "pacman",
+ "-U",
+ keyring + str(file).strip("[]'"),
+ "--noconfirm",
+ ]
+
+ logger.debug("%s" % " ".join(cmd_str))
+
+ with subprocess.Popen(
+ cmd_str,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ bufsize=1,
+ universal_newlines=True,
+ ) as process:
+ process.wait(process_timeout)
+
+ output = []
+
+ for line in process.stdout:
+ output.append(line)
+
+ if process.returncode == 0:
+ return 0
+
+ else:
+ if len(output) == 0:
+ output.append("Error: install of ArcoLinux keyring failed")
+
+ logger.error(" ".join(output))
+
+ result_err = {}
+
+ result_err["cmd_str"] = cmd_str
+ result_err["output"] = output
+
+ return result_err
+ except Exception as e:
+ logger.error("Exception in install_arco_keyring(): %s" % e)
+ result_err = {}
+
+ result_err["cmd_str"] = cmd_str
+ result_err["output"] = e
+
+ return result_err
+
+
+def remove_arco_keyring():
+ try:
+ cmd_str = ["pacman", "-Rdd", "arcolinux-keyring", "--noconfirm"]
+ with subprocess.Popen(
+ cmd_str,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ bufsize=1,
+ universal_newlines=True,
+ ) as process:
+ process.wait(process_timeout)
+
+ output = []
+
+ for line in process.stdout:
+ output.append(line)
+
+ if process.returncode == 0:
+ return 0
+
+ else:
+ if len(output) == 0:
+ output.append("Error: removal of ArcoLinux keyring failed")
+
+ logger.error(" ".join(output))
+
+ result_err = {}
+
+ result_err["cmd_str"] = cmd_str
+ result_err["output"] = output
+
+ return result_err
+
+ except Exception as e:
+ logger.error("Exception in remove_arco_keyring(): %s" % e)
+
+ result_err = {}
+
+ result_err["cmd_str"] = cmd_str
+ result_err["output"] = e
+
+ return result_err
+
+
+def install_arco_mirrorlist():
+ try:
+ mirrorlist = base_dir + "/packages/arcolinux-mirrorlist/"
+ file = os.listdir(mirrorlist)
+ cmd_str = [
+ "pacman",
+ "-U",
+ mirrorlist + str(file).strip("[]'"),
+ "--noconfirm",
+ ]
+
+ logger.debug("%s" % " ".join(cmd_str))
+ with subprocess.Popen(
+ cmd_str,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ bufsize=1,
+ universal_newlines=True,
+ ) as process:
+ process.wait(process_timeout)
+
+ output = []
+
+ for line in process.stdout:
+ output.append(line)
+
+ if process.returncode == 0:
+ return 0
+
+ else:
+ if len(output) == 0:
+ output.append("Error: install of ArcoLinux mirrorlist failed")
+
+ logger.error(" ".join(output))
+
+ result_err = {}
+
+ result_err["cmd_str"] = cmd_str
+ result_err["output"] = output
+
+ return result_err
+ except Exception as e:
+ logger.error("Exception in install_arco_mirrorlist(): %s" % e)
+
+ result_err = {}
+
+ result_err["cmd_str"] = cmd_str
+ result_err["output"] = output
+
+ return result_err
+
+
+def remove_arco_mirrorlist():
+ try:
+ cmd_str = ["pacman", "-Rdd", "arcolinux-mirrorlist-git", "--noconfirm"]
+ logger.debug("%s" % " ".join(cmd_str))
+ with subprocess.Popen(
+ cmd_str,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ bufsize=1,
+ universal_newlines=True,
+ ) as process:
+ process.wait(process_timeout)
+
+ output = []
+
+ for line in process.stdout:
+ output.append(line)
+
+ if process.returncode == 0:
+ return 0
+
+ else:
+ if len(output) == 0:
+ output.append("Error: removal of ArcoLinux mirrorlist failed")
+
+ logger.error(" ".join(output))
+
+ result_err = {}
+
+ result_err["cmd_str"] = cmd_str
+ result_err["output"] = output
+
+ return result_err
+
+ except Exception as e:
+ logger.error("Exception in remove_arco_mirrorlist(): %s" % e)
+
+ result_err = {}
+
+ result_err["cmd_str"] = cmd_str
+ result_err["output"] = e
+
+ return result_err
+
+
+def add_arco_repos():
+ logger.info("Adding ArcoLinux repos on %s" % distr)
+ try:
+ # first check if arco repos are already inside pacman conf file
+
+ if verify_arco_pacman_conf() is False:
+ # take backup of existing pacman.conf file
+
+ if os.path.exists(pacman_conf):
+ shutil.copy(pacman_conf, pacman_conf_backup)
+
+ # read existing contents from pacman.conf file
+
+ logger.info("Reading from %s" % pacman_conf)
+
+ lines = []
+
+ with open(pacman_conf, "r", encoding="utf-8") as r:
+ lines = r.readlines()
+
+ # check for existing ArcoLinux entries
+ if len(lines) > 0:
+ arco_test_repo_found = False
+ arco_repo_found = False
+ arco_3rd_party_repo_found = False
+ arco_xlrepo_found = False
+
+ for line in lines:
+ if "#" in line.strip():
+ if arco_test_repo[0].replace("#", "") in line.strip():
+ arco_test_repo_found = True
+
+ if arco_repo[0].replace("#", "") in line.strip():
+ arco_repo_found = True
+ index = lines.index(line)
+
+ del lines[index]
+ lines.insert(index, arco_repo[0])
+
+ index += 1
+
+ del lines[index]
+ lines.insert(index, arco_repo[1])
+
+ index += 1
+
+ del lines[index]
+ lines.insert(index, arco_repo[2])
+
+ if arco_3rd_party_repo[0].replace("#", "") in line.strip():
+ arco_3rd_party_repo_found = True
+ index = lines.index(line)
+
+ del lines[index]
+ lines.insert(index, arco_3rd_party_repo[0])
+
+ index += 1
+
+ del lines[index]
+ lines.insert(index, arco_3rd_party_repo[1])
+
+ index += 1
+
+ del lines[index]
+ lines.insert(index, arco_3rd_party_repo[2])
+
+ if arco_xlrepo[0].replace("#", "") in line.strip():
+ arco_xlrepo_found = True
+ index = lines.index(line)
+
+ del lines[index]
+ lines.insert(index, arco_xlrepo[0])
+
+ index += 1
+
+ del lines[index]
+ lines.insert(index, arco_xlrepo[1])
+
+ index += 1
+
+ del lines[index]
+ lines.insert(index, arco_xlrepo[2])
+
+ if line.strip() == arco_test_repo[0]:
+ arco_test_repo_found = True
+
+ if line.strip() == arco_repo[0]:
+ arco_repo_found = True
+
+ if line.strip() == arco_3rd_party_repo[0]:
+ arco_3rd_party_repo_found = True
+
+ if line.strip() == arco_xlrepo[0]:
+ arco_xlrepo_found = True
+
+ if arco_test_repo_found is False:
+ lines.append("\n")
+
+ for arco_test_repo_line in arco_test_repo:
+ lines.append(arco_test_repo_line)
+
+ if arco_repo_found is False:
+ lines.append("\n")
+
+ for arco_repo_line in arco_repo:
+ lines.append(arco_repo_line)
+
+ if arco_3rd_party_repo_found is False:
+ lines.append("\n")
+
+ for arco_3rd_party_repo_line in arco_3rd_party_repo:
+ lines.append(arco_3rd_party_repo_line)
+
+ if arco_xlrepo_found is False:
+ lines.append("\n")
+
+ for arco_xlrepo_line in arco_xlrepo:
+ lines.append(arco_xlrepo_line)
+
+ logger.info("[Add ArcoLinux repos] Writing to %s" % pacman_conf)
+
+ if len(lines) > 0:
+ with open(pacman_conf, "w", encoding="utf-8") as w:
+ for l in lines:
+ w.write(l.strip() + "\n")
+
+ w.flush()
+
+ return 0
+
+ else:
+ logger.error("Failed to process %s" % pacman_conf)
+
+ else:
+ logger.error("Failed to read %s" % pacman_conf)
+ else:
+ logger.info("ArcoLinux repos already setup inside pacman conf file")
+ return 0
+
+ except Exception as e:
+ logger.error("Exception in add_arco_repos(): %s" % e)
+ return e
+
+
+def remove_arco_repos():
+ # remove the ArcoLinux repos in /etc/pacman.conf
+ try:
+ # check for existing ArcoLinux entries and remove
+ if verify_arco_pacman_conf() is True:
+ if os.path.exists(pacman_conf):
+ shutil.copy(pacman_conf, pacman_conf_backup)
+
+ logger.info("Reading from %s" % pacman_conf)
+
+ lines = []
+
+ with open(pacman_conf, "r", encoding="utf-8") as r:
+ lines = r.readlines()
+
+ if len(lines) > 0:
+ index = 0
+
+ for line in lines:
+ if arco_test_repo[0] == line.strip().replace(" ", ""):
+ index = lines.index(line)
+
+ if index > 0:
+ if distr != "arcolinux":
+ del lines[index]
+ del lines[index]
+ del lines[index]
+
+ # make sure the arco testing repo is disabled, if absolutely required update the pacman conf file manually and enable them
+
+ if "%s" % arco_test_repo[0].replace("#", "") == line.strip():
+ index = lines.index(
+ "%s\n" % arco_test_repo[0].replace("#", "")
+ )
+ if distr != "arcolinux":
+ del lines[index]
+ del lines[index]
+ del lines[index]
+ else:
+ # comment out the testing repo
+
+ lines[index] = "%s\n" % arco_test_repo[0]
+ lines[index + 1] = "%s\n" % arco_test_repo[1]
+ lines[index + 2] = "%s\n" % arco_test_repo[2]
+
+ if "%s\n" % arco_repo[0] == line:
+ index = lines.index("%s\n" % arco_repo[0])
+
+ if index > 0:
+ if distr != "arcolinux":
+ del lines[index]
+ del lines[index]
+ del lines[index]
+ else:
+ lines[index] = "#%s\n" % arco_repo[0]
+ lines[index + 1] = "#%s\n" % arco_repo[1]
+ lines[index + 2] = "#%s\n" % arco_repo[2]
+ elif (
+ "#" in line.strip()
+ and arco_repo[0] == line.replace("#", "").strip()
+ and distr != "arcolinux"
+ ):
+ # check if already commented
+
+ index = lines.index(line)
+ del lines[index]
+ del lines[index]
+ del lines[index]
+
+ if "%s\n" % arco_3rd_party_repo[0] == line:
+ index = lines.index("%s\n" % arco_3rd_party_repo[0])
+
+ if index > 0:
+ if distr != "arcolinux":
+ del lines[index]
+ del lines[index]
+ del lines[index]
+ else:
+ lines[index] = "#%s\n" % arco_3rd_party_repo[0]
+ lines[index + 1] = "#%s\n" % arco_3rd_party_repo[1]
+ lines[index + 2] = "#%s\n" % arco_3rd_party_repo[2]
+ elif (
+ "#" in line.strip()
+ and arco_3rd_party_repo[0] == line.replace("#", "").strip()
+ and distr != "arcolinux"
+ ):
+ # check if already commented
+
+ index = lines.index(line)
+ del lines[index]
+ del lines[index]
+ del lines[index]
+
+ if "%s\n" % arco_xlrepo[0] == line:
+ index = lines.index("%s\n" % arco_xlrepo[0])
+
+ if index > 0:
+ if distr != "arcolinux":
+ del lines[index]
+ del lines[index]
+ del lines[index]
+ else:
+ lines[index] = "#%s\n" % arco_xlrepo[0]
+ lines[index + 1] = "#%s\n" % arco_xlrepo[1]
+ lines[index + 2] = "#%s\n" % arco_xlrepo[2]
+ elif (
+ "#" in line.strip()
+ and arco_xlrepo[0] == line.replace("#", "").strip()
+ and distr != "arcolinux"
+ ):
+ # check if already commented
+
+ index = lines.index(line)
+ del lines[index]
+ del lines[index]
+ del lines[index]
+
+ # remove any white spaces from end of the file only if on non arcolinux system
+ # on any non arcolinux distro lines are deleted which leaves empty lines in the file
+ # causing the file to grow in size
+ if distr != "arcolinux":
+ if lines[-1] == "\n":
+ del lines[-1]
+
+ if lines[-2] == "\n":
+ del lines[-2]
+
+ if lines[-3] == "\n":
+ del lines[-3]
+
+ if lines[-4] == "\n":
+ del lines[-4]
+
+ logger.info("[Remove ArcoLinux Repos] Writing to %s" % pacman_conf)
+
+ if len(lines) > 0:
+ with open(pacman_conf, "w") as w:
+ w.writelines(lines)
+
+ w.flush()
+
+ return 0
+
+ else:
+ logger.error("Failed to process %s" % pacman_conf)
+
+ else:
+ logger.error("Failed to read %s" % pacman_conf)
+ else:
+ logger.info("No ArcoLinux repos setup inside pacman conf file")
+ return 0
+
+ except Exception as e:
+ logger.error("Exception in remove_arco_repos(): %s" % e)
+ return e
+
+
+# check if pacman.conf has arco repos setup
+
+
+def verify_arco_pacman_conf():
+ try:
+ lines = None
+ arco_repo_setup = False
+ arco_3rd_party_repo_setup = False
+ arco_xlrepo_setup = False
+ with open(pacman_conf, "r") as r:
+ lines = r.readlines()
+
+ if lines is not None:
+ for line in lines:
+ if arco_repo[0] in line.strip():
+ if "#" not in line.strip():
+ arco_repo_setup = True
+ else:
+ return False
+
+ if arco_3rd_party_repo[0] in line.strip():
+ if "#" not in line.strip():
+ arco_3rd_party_repo_setup = True
+ else:
+ return False
+
+ if arco_xlrepo[0] in line.strip():
+ if "#" not in line.strip():
+ arco_xlrepo_setup = True
+ else:
+ return False
+
+ if (
+ arco_repo_setup is True
+ and arco_3rd_party_repo_setup is True
+ and arco_xlrepo_setup is True
+ ):
+ return True
+ else:
+ return False
+ except Exception as e:
+ logger.error("Exception in check_arco_pacman(): %s" % e)
+
+
+# =====================================================
+# CHECK IF PACKAGE IS INSTALLED
+# =====================================================
+
+
+# check if package is installed or not
+def check_package_installed(package_name):
+ # query_str = ["pacman", "-Qi", package]
+ query_str = ["pacman", "-Qq"]
+ try:
+ process_pkg_installed = subprocess.run(
+ query_str,
+ shell=False,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ timeout=process_timeout,
+ universal_newlines=True,
+ )
+
+ if package_name in process_pkg_installed.stdout.splitlines():
+ return True
+ else:
+ # check if the package is in the local pacman db
+ if check_pacman_localdb(package_name):
+ return True
+ else:
+ return False
+
+ except subprocess.CalledProcessError:
+ # package is not installed
+ return False
+
+
+# =====================================================
+# QUERY THE LOCAL PACMAN DB FOR PACKAGE
+# =====================================================
+
+# This is used to validate a package install/uninstall
+
+
+# check if package is installed or not
+def check_pacman_localdb(package_name):
+ query_str = ["pacman", "-Qi", package_name]
+
+ try:
+ process_pkg_installed = subprocess.run(
+ query_str,
+ shell=False,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ timeout=process_timeout,
+ )
+
+ if process_pkg_installed.returncode == 0:
+ for line in process_pkg_installed.stdout.decode("utf-8").splitlines():
+ if line.startswith("Name :"):
+ if line.replace(" ", "").split("Name:")[1].strip() == package_name:
+ return True
+
+ if line.startswith("Replaces :"):
+ replaces = line.split("Replaces :")[1].strip()
+ if len(replaces) > 0:
+ if package_name in replaces:
+ return True
+
+ else:
+ return False
+
+ except subprocess.CalledProcessError:
+ # package is not installed
+ return False
+
+
+# =====================================================
+# CHECK RUNNING PROCESS
+# =====================================================
+
+
+def check_if_process_running(process_name):
+ for proc in psutil.process_iter():
+ try:
+ pinfo = proc.as_dict(attrs=["pid", "name", "create_time"])
+ if process_name == pinfo["pid"]:
+ return True
+ except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
+ pass
+ return False
+
+
+# =====================================================
+# NOTIFICATIONS
+# =====================================================
+
+
+def show_in_app_notification(self, message, err):
+ if self.timeout_id is not None:
+ GLib.source_remove(self.timeout_id)
+ self.timeout_id = None
+
+ if err is True:
+ self.notification_label.set_markup(
+ '' + message + ""
+ )
+ else:
+ self.notification_label.set_markup(
+ '' + message + ""
+ )
+ self.notification_revealer.set_reveal_child(True)
+ self.timeout_id = GLib.timeout_add(3000, timeout, self)
+
+
+def timeout(self):
+ close_in_app_notification(self)
+
+
+def close_in_app_notification(self):
+ self.notification_revealer.set_reveal_child(False)
+ GLib.source_remove(self.timeout_id)
+ self.timeout_id = None
+
+
+def reveal_infobar(self, progress_dialog):
+ progress_dialog.infobar.set_revealed(True)
+ progress_dialog.infobar.show_all()
+ GLib.source_remove(self.timeout_id)
+ self.timeout_id = None
+
+
+"""
+ Since the app could be quit/terminated at any time during a pacman transaction.
+ The pacman process spawned by the install/uninstall threads, needs to be terminated too.
+ Otherwise the app may hang waiting for pacman to complete its transaction.
+"""
+# =====================================================
+# PACMAN
+# =====================================================
+
+
+def terminate_pacman():
+ try:
+ process_found = False
+ for proc in psutil.process_iter():
+ try:
+ pinfo = proc.as_dict(attrs=["pid", "name", "create_time"])
+ if pinfo["name"] == "pacman":
+ process_found = True
+ logger.debug("Killing pacman process = %s" % pinfo["name"])
+
+ proc.kill()
+
+ except (psutil.NoSuchProcess, psutil.AccessDenied):
+ continue
+
+ if process_found is True:
+ if check_pacman_lockfile():
+ os.unlink(pacman_lockfile)
+ except Exception as e:
+ logger.error("Exception in terminate_pacman() : %s" % e)
+
+
+def is_thread_alive(thread_name):
+ for thread in threading.enumerate():
+ if thread.name == thread_name and thread.is_alive():
+ return True
+
+ return False
+
+
+def print_running_threads():
+ threads_alive = []
+ for thread in threading.enumerate():
+ if thread.is_alive():
+ threads_alive.append(thread.name)
+
+ for th in threads_alive:
+ logger.debug("Thread = %s status = alive" % th)
+
+
+# this keeps monitoring for items on the package holding queue
+# items are added to the queue if a package install is stuck behind another pacman transaction
+def check_holding_queue(self):
+ while True:
+ (
+ package,
+ action,
+ widget,
+ cmd_str,
+ progress_dialog,
+ ) = self.pkg_holding_queue.get()
+
+ try:
+ # logger.debug("Enqueued package = %s" % package.name)
+
+ while check_pacman_lockfile() is True:
+ # logger.debug("Pacman is processing a transaction")
+ time.sleep(0.2)
+
+ th_subprocess = Thread(
+ name="thread_subprocess",
+ target=start_subprocess,
+ args=(
+ self,
+ cmd_str,
+ progress_dialog,
+ action,
+ package,
+ widget,
+ ),
+ daemon=True,
+ )
+
+ th_subprocess.start()
+
+ finally:
+ self.pkg_holding_queue.task_done()
+
+
+# check if pacman lock file exists
+def check_pacman_lockfile():
+ try:
+ if os.path.exists(pacman_lockfile):
+ # logger.debug("Pacman lockfile found inside %s" % pacman_lockfile)
+ # logger.debug("Another pacman process is running")
+ return True
+ else:
+ # logger.info("No pacman lockfile found, OK to proceed")
+ return False
+ except Exception as e:
+ logger.error("Exception in check_pacman_lockfile() : %s" % e)
+
+
+# this gets info on the pacman process currently running
+def get_pacman_process():
+ try:
+ for proc in psutil.process_iter():
+ try:
+ pinfo = proc.as_dict(attrs=["pid", "name", "create_time"])
+ if pinfo["name"] == "pacman":
+ return " ".join(proc.cmdline())
+
+ except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
+ pass
+
+ except Exception as e:
+ logger.error("Exception in get_pacman_process() : %s" % e)
+
+
+# used inside PackageImportDialog to display package installation progress
+def update_package_import_textview(self, line):
+ try:
+ if len(line) > 0:
+ self.msg_buffer.insert(
+ self.msg_buffer.get_end_iter(),
+ " %s" % line,
+ len(" %s" % line),
+ )
+
+ except Exception as e:
+ logger.error("Exception in update_progress_textview(): %s" % e)
+ finally:
+ self.pkg_import_queue.task_done()
+ text_mark_end = self.msg_buffer.create_mark(
+ "end", self.msg_buffer.get_end_iter(), False
+ )
+ # scroll to the end of the textview
+ self.textview.scroll_mark_onscreen(text_mark_end)
+
+
+def monitor_package_import(self):
+ while True:
+ if self.stop_thread is True:
+ break
+ message = self.pkg_import_queue.get()
+ GLib.idle_add(
+ update_package_import_textview,
+ self,
+ message,
+ priority=GLib.PRIORITY_DEFAULT,
+ )
+
+ # time.sleep(0.2)
+
+
+# update the package install status label called from outside the main thread
+def update_package_status_label(label, text):
+ label.set_markup(text)
+
+
+def import_packages(self):
+ try:
+ packages_status_list = []
+ package_failed = False
+ package_err = {}
+
+ count = 0
+
+ # clean pacman cache
+
+ if os.path.exists(pacman_cache_dir):
+ query_pacman_clean_cache_str = ["pacman", "-Sc", "--noconfirm"]
+
+ logger.info("Cleaning Pacman cache directory = %s" % pacman_cache_dir)
+
+ event = "%s [INFO]: Cleaning pacman cache\n" % datetime.now().strftime(
+ "%Y-%m-%d-%H-%M-%S"
+ )
+
+ self.pkg_import_queue.put(event)
+
+ GLib.idle_add(
+ update_package_status_label,
+ self.label_package_status,
+ "Status: Cleaning pacman cache",
+ )
+
+ # clean the pacman cache, so we don't run into any invalid/corrupt package errors during install
+ process_pacman_cc = subprocess.Popen(
+ query_pacman_clean_cache_str,
+ shell=False,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ universal_newlines=True,
+ )
+
+ out, err = process_pacman_cc.communicate(timeout=process_timeout)
+
+ self.pkg_import_queue.put(out)
+
+ if process_pacman_cc.returncode == 0:
+ logger.info("Pacman cache directory cleaned")
+ else:
+ logger.error("Failed to clean Pacman cache directory")
+
+ logger.info("Running full system upgrade")
+ # run full system upgrade, Arch does not allow partial package updates
+ query_str = ["pacman", "-Syu", "--noconfirm"]
+ # query_str = ["pacman", "-Qqen"]
+ logger.info("Running %s" % " ".join(query_str))
+
+ event = "%s [INFO]:Running full system upgrade\n" % datetime.now().strftime(
+ "%Y-%m-%d-%H-%M-%S"
+ )
+
+ self.pkg_import_queue.put(event)
+
+ GLib.idle_add(
+ update_package_status_label,
+ self.label_package_status,
+ "Status: Performing full system upgrade - do not power off your system",
+ )
+
+ output = []
+
+ with subprocess.Popen(
+ query_str,
+ shell=False,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ bufsize=1,
+ universal_newlines=True,
+ ) as process:
+ while True:
+ if process.poll() is not None:
+ break
+
+ for line in process.stdout:
+ # print(line.strip())
+
+ self.pkg_import_queue.put(line)
+
+ output.append(line)
+
+ # time.sleep(0.2)
+
+ if process.returncode == 0:
+ logger.info("Pacman system upgrade completed")
+ GLib.idle_add(
+ update_package_status_label,
+ self.label_package_status,
+ "Status: Full system upgrade - completed",
+ )
+ else:
+ if len(output) > 0:
+ if "there is nothing to do" not in output:
+ logger.error("Pacman system upgrade failed")
+ GLib.idle_add(
+ update_package_status_label,
+ self.label_package_status,
+ "Status: Full system upgrade - failed",
+ )
+
+ print("%s" % " ".join(output))
+
+ event = "%s [ERROR]: Installation of packages aborted due to errors\n" % datetime.now().strftime(
+ "%Y-%m-%d-%H-%M-%S"
+ )
+
+ self.pkg_import_queue.put(event)
+
+ logger.error("Installation of packages aborted due to errors")
+
+ return
+
+ # do not proceed with package installs if system upgrade fails
+ else:
+ return
+
+ # iterate through list of packages, calling pacman -S on each one
+ for package in self.packages_list:
+ process_output = []
+ package = package.strip()
+ if len(package) > 0:
+ if "#" not in package:
+ query_str = ["pacman", "-S", package, "--needed", "--noconfirm"]
+
+ count += 1
+
+ logger.info("Running %s" % " ".join(query_str))
+
+ event = "%s [INFO]: Running %s\n" % (
+ datetime.now().strftime("%Y-%m-%d-%H-%M-%S"),
+ " ".join(query_str),
+ )
+
+ self.pkg_import_queue.put(event)
+
+ with subprocess.Popen(
+ query_str,
+ shell=False,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ bufsize=1,
+ universal_newlines=True,
+ ) as process:
+ while True:
+ if process.poll() is not None:
+ break
+ for line in process.stdout:
+ process_output.append(line.strip())
+
+ self.pkg_import_queue.put(line)
+
+ # time.sleep(0.2)
+
+ if process.returncode == 0:
+ # since this is being run in another thread outside of main, use GLib to update UI component
+ GLib.idle_add(
+ update_package_status_label,
+ self.label_package_status,
+ "Status: %s -> Installed" % package,
+ )
+
+ GLib.idle_add(
+ update_package_status_label,
+ self.label_package_count,
+ "Progress: %s/%s"
+ % (count, len(self.packages_list)),
+ )
+
+ packages_status_list.append("%s -> Installed" % package)
+
+ else:
+ logger.error("%s --> Install failed" % package)
+ GLib.idle_add(
+ update_package_status_label,
+ self.label_package_status,
+ "Status: %s -> Install failed" % package,
+ )
+
+ GLib.idle_add(
+ update_package_status_label,
+ self.label_package_count,
+ "Progress: %s/%s"
+ % (count, len(self.packages_list)),
+ )
+
+ if len(process_output) > 0:
+ if "there is nothing to do" not in process_output:
+ logger.error("%s" % " ".join(process_output))
+ # store package error in dict
+ package_err[package] = " ".join(process_output)
+
+ package_failed = True
+
+ packages_status_list.append("%s -> Failed" % package)
+
+ if len(packages_status_list) > 0:
+ self.pkg_status_queue.put(packages_status_list)
+
+ if package_failed is True:
+ GLib.idle_add(
+ update_package_status_label,
+ self.label_package_status,
+ "Some packages have failed to install see %s" % self.logfile,
+ )
+
+ # end
+ event = "%s [INFO]: Completed, check the logfile for any errors\n" % (
+ datetime.now().strftime("%Y-%m-%d-%H-%M-%S"),
+ )
+
+ self.pkg_import_queue.put(event)
+
+ except Exception as e:
+ logger.error("Exception in import_packages(): %s" % e)
+ finally:
+ self.pkg_err_queue.put(package_err)
+
+
+# package install completed now log status to log file
+def log_package_status(self):
+ logger.info("Logging package status")
+ packages_status_list = None
+ package_err = None
+ while True:
+ try:
+ time.sleep(0.2)
+ packages_status_list = self.pkg_status_queue.get()
+ package_err = self.pkg_err_queue.get()
+
+ finally:
+ self.pkg_status_queue.task_done()
+ self.pkg_err_queue.task_done()
+ with open(self.logfile, "w") as f:
+ f.write(
+ "# This file was auto-generated by Sofirem on %s at %s\n"
+ % (
+ datetime.today().date(),
+ datetime.now().strftime("%H:%M:%S"),
+ ),
+ )
+ if packages_status_list is not None:
+ for package in packages_status_list:
+ if package.split("->")[0].strip() in package_err:
+ f.write("%s\n" % package)
+ f.write(
+ "\tERROR: %s\n"
+ % package_err[package.split("->")[0].strip()]
+ )
+ else:
+ f.write("%s\n" % package)
+
+ break
+
+
+# open sofirem log directory
+def open_log_dir():
+ try:
+ subprocess.Popen(
+ ["sudo", "-u", sudo_username, "xdg-open", log_dir],
+ shell=False,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ )
+ except Exception as e:
+ logger.error("Exception in open_log_dir(): %s" % e)
+
+
+# ANYTHING UNDER THIS LINE IS CURRENTLY UNUSED!
\ No newline at end of file