Files
snigdhaos-blackbox/usr/share/blackbox/Functions.py
2024-04-19 05:17:41 +05:30

2672 lines
90 KiB
Python

# =================================================================
# = Author: Cameron Percival =
# =================================================================
import os
import sys
import psutil
import time
import datetime
from datetime import datetime, timedelta
import subprocess
import threading
import gi
import logging
from logging.handlers import TimedRotatingFileHandler
import shutil
from threading import Thread
from Package import Package
from Settings import Settings
from ui.MessageDialog import MessageDialog
from distro import id
from os import makedirs
import Functions as fn
gi.require_version("Gtk", "3.0")
from gi.repository import GLib, Gtk
# =====================================================
# Base Directory
# =====================================================
base_dir = os.path.dirname(os.path.realpath(__file__))
# =====================================================
# Global Variables
# =====================================================
sudo_username = os.getlogin()
home = "/home/" + str(sudo_username)
path_dir_cache = base_dir + "/cache/"
packages = []
distr = id()
blackbox_lockfile = "/tmp/blackbox.lock"
blackbox_pidfile = "/tmp/blackbox.pid"
# 10m timeout
process_timeout = 600
snigdhaos_mirrorlist = "/etc/pacman.d/snigdhaos-mirrorlist"
pacman_conf = "/etc/pacman.conf"
pacman_conf_backup = "/etc/pacman.conf.bak"
pacman_logfile = "/var/log/pacman.log"
pacman_lockfile = "/var/lib/pacman/db.lck"
pacman_cache_dir = "/var/cache/pacman/pkg/"
snigdhaos_core = [
"[snigdhaos-core]",
"SigLevel = PackageRequired DatabaseNever",
"Include = /etc/pacman.d/snigdhaos-mirrorlist",
]
snigdhaos_extra = [
"[snigdhaos-extra]",
"SigLevel = PackageRequired DatabaseNever",
"Include = /etc/pacman.d/snigdhaos-mirrorlist",
]
log_dir = "/var/log/blackbox/"
config_dir = "%s/.config/blackbox" % home
config_file = "%s/blackbox.yaml" % config_dir
event_log_file = "%s/event.log" % log_dir
export_dir = "%s/blackbox-exports" % home
# =====================================================
# PERMISSIONS
# =====================================================
def permissions(dst):
try:
groups = subprocess.run(
["sh", "-c", "id " + sudo_username],
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
for x in groups.stdout.decode().split(" "):
if "gid" in x:
g = x.split("(")[1]
group = g.replace(")", "").strip()
subprocess.call(["chown", "-R", sudo_username + ":" + group, dst], shell=False)
except Exception as e:
logger.error(e)
# Create log, export, conf directory
try:
if not os.path.exists(log_dir):
makedirs(log_dir)
if not os.path.exists(export_dir):
makedirs(export_dir)
if not os.path.exists(config_dir):
makedirs(config_dir)
permissions(export_dir)
permissions(config_dir)
print("[INFO] Log directory = %s" % log_dir)
print("[INFO] Export directory = %s" % export_dir)
print("[INFO] Config directory = %s" % config_dir)
except os.error as oe:
print("[ERROR] Exception in setup log/export directory: %s" % oe)
sys.exit(1)
# read in conf file from $HOME/.config/blackbox/blackbox.yaml
# initialize logger
try:
settings = Settings(False, False)
settings_config = settings.read_config_file()
logger = logging.getLogger("logger")
# create console handler and set level to debug
ch = logging.StreamHandler()
# rotate the events log every Friday
tfh = TimedRotatingFileHandler(
event_log_file, encoding="utf-8", delay=False, when="W4"
)
if settings_config:
debug_logging_enabled = None
debug_logging_enabled = settings_config["Debug Logging"]
if debug_logging_enabled is not None and debug_logging_enabled is True:
logger.setLevel(logging.DEBUG)
ch.setLevel(logging.DEBUG)
tfh.setLevel(level=logging.DEBUG)
else:
logger.setLevel(logging.INFO)
ch.setLevel(logging.INFO)
tfh.setLevel(level=logging.INFO)
else:
logger.setLevel(logging.INFO)
ch.setLevel(logging.INFO)
tfh.setLevel(level=logging.INFO)
# create formatter
formatter = logging.Formatter(
"%(asctime)s:%(levelname)s > %(message)s", "%Y-%m-%d %H:%M:%S"
)
# add formatter to ch
ch.setFormatter(formatter)
tfh.setFormatter(formatter)
# add ch to logger
logger.addHandler(ch)
# add fh to logger
logger.addHandler(tfh)
except Exception as e:
print("[ERROR] Failed to setup logger, exception: %s" % e)
# on app close create file of installed packages
def _on_close_create_packages_file():
try:
logger.info("App closing saving currently installed packages to file")
packages_file = "%s-packages.txt" % datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
logger.info("Saving in %s%s" % (log_dir, packages_file))
cmd = ["pacman", "-Q"]
with subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=1,
universal_newlines=True,
) as process:
with open("%s/%s" % (log_dir, packages_file), "w") as f:
for line in process.stdout:
f.write("%s" % line)
except Exception as e:
logger.error("Exception in on_close_create_packages_file(): %s" % e)
# =====================================================
# GLOBAL FUNCTIONS
# =====================================================
def _get_position(lists, value):
data = [string for string in lists if value in string]
position = lists.index(data[0])
return position
def is_file_stale(filepath, stale_days, stale_hours, stale_minutes):
# first, lets obtain the datetime of the day that we determine data to be "stale"
now = datetime.now()
# For the purposes of this, we are assuming that one would have the app open longer than 5 minutes if installing.
stale_datetime = now - timedelta(
days=stale_days, hours=stale_hours, minutes=stale_minutes
)
# Check to see if the file path is in existence.
if os.path.exists(filepath):
# if the file exists, when was it made?
file_created = datetime.fromtimestamp(os.path.getctime(filepath))
# file is older than the time delta identified above
if file_created < stale_datetime:
return True
return False
# =====================================================
# PACMAN SYNC PACKAGE DB
# =====================================================
def sync_package_db():
try:
sync_str = ["pacman", "-Sy"]
logger.info("Synchronizing pacman package databases")
process_sync = subprocess.run(
sync_str,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
timeout=process_timeout,
)
if process_sync.returncode == 0:
return None
else:
if process_sync.stdout:
out = str(process_sync.stdout.decode("utf-8"))
logger.error(out)
return out
except Exception as e:
logger.error("Exception in sync_package_db(): %s" % e)
# =====================================================
# PACMAN SYNC FILES DB
# =====================================================
def sync_file_db():
try:
sync_str = ["pacman", "-Fy"]
logger.info("Synchronizing pacman file database")
process_sync = subprocess.run(
sync_str,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
timeout=process_timeout,
)
if process_sync.returncode == 0:
return None
else:
if process_sync.stdout:
out = str(process_sync.stdout.decode("utf-8"))
logger.error(out)
return out
except Exception as e:
logger.error("Exception in sync_file_db(): %s" % e)
# =====================================================
# PACMAN INSTALL/UNINSTALL PROCESS
# =====================================================
# this is run inside a separate thread
def start_subprocess(self, cmd, progress_dialog, action, pkg, widget):
try:
self.switch_package_version.set_sensitive(False)
self.switch_snigdhaos_keyring.set_sensitive(False)
self.switch_snigdhaos_mirrorlist.set_sensitive(False)
widget.set_sensitive(False)
# store process std out into a list, if there are errors display to user once the process completes
process_stdout_lst = []
process_stdout_lst.append("Command = %s\n\n" % " ".join(cmd))
with subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=1,
universal_newlines=True,
) as process:
if progress_dialog is not None:
progress_dialog.pkg_dialog_closed = False
self.in_progress = True
if (
progress_dialog is not None
and progress_dialog.pkg_dialog_closed is False
):
line = (
"Pacman is processing the %s of package %s \n\nCommand running = %s\n\n"
% (action, pkg.name, " ".join(cmd))
)
GLib.idle_add(
update_progress_textview,
self,
line,
progress_dialog,
priority=GLib.PRIORITY_DEFAULT,
)
logger.debug("Pacman is now processing the request")
# poll for the process to complete
# read stdout as it comes in, update the progress textview
# poll() Check if child process has terminated.
# Set and return returncode attribute. Otherwise, returns None.
while True:
if process.poll() is not None:
break
if (
progress_dialog is not None
and progress_dialog.pkg_dialog_closed is False
):
for line in process.stdout:
GLib.idle_add(
update_progress_textview,
self,
line,
progress_dialog,
priority=GLib.PRIORITY_DEFAULT,
)
process_stdout_lst.append(line)
time.sleep(0.3)
else:
# increase wait time to reduce cpu load, no textview updates required since dialog is closed
# since the progress dialog window is closed, capture errors and then later display it
for line in process.stdout:
process_stdout_lst.append(line)
time.sleep(1)
returncode = None
returncode = process.poll()
# logger.debug("Pacman process return code = %s" % returncode)
if returncode is not None:
logger.info("Pacman process completed running = [ %s ]" % " ".join(cmd))
GLib.idle_add(
refresh_ui,
self,
action,
widget,
pkg,
progress_dialog,
process_stdout_lst,
priority=GLib.PRIORITY_DEFAULT,
)
else:
# an error happened during the pacman transaction
logger.error(
"Pacman process failed when running = [ %s ]" % " ".join(cmd)
)
except TimeoutError as t:
logger.error("TimeoutError in %s start_subprocess(): %s" % (action, t))
process.terminate()
if progress_dialog is not None:
progress_dialog.btn_package_progress_close.set_sensitive(True)
self.switch_package_version.set_sensitive(True)
self.switch_snigdhaos_keyring.set_sensitive(True)
self.switch_snigdhaos_mirrorlist.set_sensitive(True)
except SystemError as s:
logger.error("SystemError in %s start_subprocess(): %s" % (action, s))
process.terminate()
if progress_dialog is not None:
progress_dialog.btn_package_progress_close.set_sensitive(True)
self.switch_package_version.set_sensitive(True)
self.switch_snigdhaos_keyring.set_sensitive(True)
self.switch_snigdhaos_mirrorlist.set_sensitive(True)
# refresh ui components, once the process completes
# show notification dialog to user if errors are encountered during package install/uninstall
def refresh_ui(self, action, switch, pkg, progress_dialog, process_stdout_lst):
self.switch_package_version.set_sensitive(True)
self.switch_snigdhaos_keyring.set_sensitive(True)
self.switch_snigdhaos_mirrorlist.set_sensitive(True)
logger.debug("Checking if package %s is installed" % pkg.name)
installed = check_package_installed(pkg.name)
if progress_dialog is not None:
progress_dialog.btn_package_progress_close.set_sensitive(True)
if installed is True and action == "install":
logger.debug("Toggle switch state = True")
switch.set_sensitive(True)
switch.set_state(True)
switch.set_active(True)
if progress_dialog is not None:
if progress_dialog.pkg_dialog_closed is False:
progress_dialog.set_title("Package install for %s completed" % pkg.name)
progress_dialog.infobar.set_name("infobar_info")
content = progress_dialog.infobar.get_content_area()
if content is not None:
for widget in content.get_children():
content.remove(widget)
lbl_install = Gtk.Label(xalign=0, yalign=0)
lbl_install.set_markup("<b>Package %s installed</b>" % pkg.name)
content.add(lbl_install)
if self.timeout_id is not None:
GLib.source_remove(self.timeout_id)
self.timeout_id = None
self.timeout_id = GLib.timeout_add(
100, reveal_infobar, self, progress_dialog
)
if installed is False and action == "install":
logger.debug("Toggle switch state = False")
if progress_dialog is not None:
# install failed/terminated
switch.set_sensitive(True)
switch.set_state(False)
switch.set_active(False)
if progress_dialog.pkg_dialog_closed is False:
progress_dialog.set_title("Package install for %s failed" % pkg.name)
progress_dialog.infobar.set_name("infobar_error")
content = progress_dialog.infobar.get_content_area()
if content is not None:
for widget in content.get_children():
content.remove(widget)
lbl_install = Gtk.Label(xalign=0, yalign=0)
lbl_install.set_markup(
"<b>Package %s install failed</b>" % pkg.name
)
content.add(lbl_install)
if self.timeout_id is not None:
GLib.source_remove(self.timeout_id)
self.timeout_id = None
self.timeout_id = GLib.timeout_add(
100, reveal_infobar, self, progress_dialog
)
else:
logger.debug(" ".join(process_stdout_lst))
message_dialog = MessageDialog(
"Errors occurred during install",
"Errors occurred install for %s failed" % pkg.name,
"Pacman failed to install package %s\n" % pkg.name,
" ".join(process_stdout_lst),
"error",
True,
)
message_dialog.show_all()
result = message_dialog.run()
message_dialog.destroy()
elif progress_dialog is None or progress_dialog.pkg_dialog_closed is True:
# the package progress dialog has been closed, but notify user package failed to install
if (
"error: failed to init transaction (unable to lock database)\n"
in process_stdout_lst
):
# at this point the package install is stuck behind another pacman transaction so put this onto the holding queue
# logger.debug(" ".join(process_stdout_lst))
if progress_dialog is None:
logger.debug("Adding package to holding queue")
if self.display_package_progress is False:
inst_str = [
"pacman",
"-S",
pkg.name,
"--needed",
"--noconfirm",
]
self.pkg_holding_queue.put(
(
pkg,
action,
switch,
inst_str,
progress_dialog,
),
)
else:
logger.debug(" ".join(process_stdout_lst))
switch.set_sensitive(True)
switch.set_state(False)
switch.set_active(False)
proc = fn.get_pacman_process()
message_dialog = MessageDialog(
"Warning",
"BlackBox cannot proceed pacman lockfile found",
"Pacman cannot lock the db, a lockfile is found inside %s"
% fn.pacman_lockfile,
"Pacman is running: %s" % proc,
"warning",
False,
)
message_dialog.show_all()
result = message_dialog.run()
message_dialog.destroy()
# progress dialog is closed show message dialog with error
elif "error: target not found: %s\n" % pkg.name in process_stdout_lst:
switch.set_sensitive(True)
switch.set_state(False)
switch.set_active(False)
message_dialog = MessageDialog(
"Error",
"Pacman repository error: package '%s' was not found" % pkg.name,
"BlackBox cannot process the request",
"Are the correct pacman mirrorlists configured ?",
"error",
False,
)
message_dialog.show_all()
result = message_dialog.run()
message_dialog.destroy()
else:
# logger.debug(" ".join(process_stdout_lst))
switch.set_sensitive(True)
switch.set_state(False)
switch.set_active(False)
message_dialog = MessageDialog(
"Errors occurred during install",
"Errors occurred install for %s failed" % pkg.name,
"Pacman failed to install package %s\n" % pkg.name,
" ".join(process_stdout_lst),
"error",
True,
)
message_dialog.show_all()
result = message_dialog.run()
message_dialog.destroy()
if installed is False and action == "uninstall":
logger.debug("Toggle switch state = False")
switch.set_sensitive(True)
switch.set_state(False)
switch.set_active(False)
if progress_dialog is not None:
if progress_dialog.pkg_dialog_closed is False:
progress_dialog.set_title(
"Package uninstall for %s completed" % pkg.name
)
progress_dialog.infobar.set_name("infobar_info")
content = progress_dialog.infobar.get_content_area()
if content is not None:
for widget in content.get_children():
content.remove(widget)
lbl_install = Gtk.Label(xalign=0, yalign=0)
lbl_install.set_markup("<b>Package %s uninstalled</b>" % pkg.name)
content.add(lbl_install)
if self.timeout_id is not None:
GLib.source_remove(self.timeout_id)
self.timeout_id = None
self.timeout_id = GLib.timeout_add(
100, reveal_infobar, self, progress_dialog
)
if installed is True and action == "uninstall":
# uninstall failed/terminated
switch.set_sensitive(True)
switch.set_state(True)
switch.set_active(True)
if progress_dialog is not None:
if progress_dialog.pkg_dialog_closed is False:
progress_dialog.set_title("Package uninstall for %s failed" % pkg.name)
progress_dialog.infobar.set_name("infobar_error")
content = progress_dialog.infobar.get_content_area()
if content is not None:
for widget in content.get_children():
content.remove(widget)
lbl_install = Gtk.Label(xalign=0, yalign=0)
lbl_install.set_markup(
"<b>Package %s uninstall failed</b>" % pkg.name
)
content.add(lbl_install)
if self.timeout_id is not None:
GLib.source_remove(self.timeout_id)
self.timeout_id = None
self.timeout_id = GLib.timeout_add(
100, reveal_infobar, self, progress_dialog
)
elif progress_dialog is None or progress_dialog.pkg_dialog_closed is True:
# the package progress dialog has been closed, but notify user package failed to uninstall
if (
"error: failed to init transaction (unable to lock database)\n"
in process_stdout_lst
):
logger.error(" ".join(process_stdout_lst))
else:
message_dialog = MessageDialog(
"Errors occurred during uninstall",
"Errors occurred uninstall of %s failed" % pkg.name,
"Pacman failed to uninstall package %s\n" % pkg.name,
" ".join(process_stdout_lst),
"error",
True,
)
message_dialog.show_all()
result = message_dialog.run()
message_dialog.destroy()
# update progress textview using stdout from the pacman process running
def update_progress_textview(self, line, progress_dialog):
if (
progress_dialog is not None
and progress_dialog.pkg_dialog_closed is False
and self.in_progress is True
):
buffer = progress_dialog.package_progress_textview.get_buffer()
if len(line) > 0 or buffer is None:
buffer.insert(buffer.get_end_iter(), "%s" % line, len("%s" % line))
text_mark_end = buffer.create_mark("\nend", buffer.get_end_iter(), False)
# scroll to the end of the textview
progress_dialog.package_progress_textview.scroll_mark_onscreen(
text_mark_end
)
else:
# dialog window is closed
line = None
return False
# =====================================================
# APP INSTALLATION
# =====================================================
def install(self):
pkg, action, widget, inst_str, progress_dialog = self.pkg_queue.get()
try:
if action == "install":
# path = base_dir + "/cache/installed.lst"
logger.debug("Running inside install thread")
logger.info("Installing package %s" % pkg.name)
logger.debug(inst_str)
# run pacman process inside a thread
th_subprocess_install = Thread(
name="thread_subprocess",
target=start_subprocess,
args=(
self,
inst_str,
progress_dialog,
action,
pkg,
widget,
),
daemon=True,
)
th_subprocess_install.start()
logger.debug("Thread: subprocess install started")
except Exception as e:
# deactivate switch widget, install failed
widget.set_state(False)
if progress_dialog is not None:
progress_dialog.btn_package_progress_close.set_sensitive(True)
logger.error("Exception in install(): %s" % e)
finally:
# task completed
self.pkg_queue.task_done()
# =====================================================
# APP UNINSTALLATION
# =====================================================
def uninstall(self):
pkg, action, widget, uninst_str, progress_dialog = self.pkg_queue.get()
try:
if action == "uninstall":
# path = base_dir + "/cache/installed.lst"
logger.debug("Running inside uninstall thread")
logger.info("Uninstalling package %s" % pkg.name)
logger.debug(uninst_str)
# run pacman process inside a thread
th_subprocess_uninstall = Thread(
name="thread_subprocess",
target=start_subprocess,
args=(
self,
uninst_str,
progress_dialog,
action,
pkg,
widget,
),
daemon=True,
)
# is there a pacman lockfile, wait for it before uninstalling
# while check_pacman_lockfile():
# time.sleep(0.2)
th_subprocess_uninstall.start()
logger.debug("Thread: subprocess uninstall started")
except Exception as e:
widget.set_state(True)
if progress_dialog is not None:
progress_dialog.btn_package_progress_close.set_sensitive(True)
logger.error("Exception in uninstall(): %s" % e)
finally:
self.pkg_queue.task_done()
# =====================================================
# SEARCH INDEXING
# =====================================================
# store a list of package metadata into memory for fast retrieval
def store_packages():
path = base_dir + "/yaml/"
cache = base_dir + "/cache/yaml-packages.lst"
yaml_files = []
packages = []
category_dict = {}
try:
# get latest package version info
package_metadata = get_all_package_info()
# get a list of yaml files
for file in os.listdir(path):
if file.endswith(".yaml"):
yaml_files.append(file)
if len(yaml_files) > 0:
for yaml_file in yaml_files:
cat_desc = ""
package_name = ""
package_cat = ""
category_name = yaml_file[11:-5].strip().capitalize()
# read contents of each yaml file
with open(path + yaml_file, "r") as yaml:
content = yaml.readlines()
for line in content:
if line.startswith(" packages:"):
continue
elif line.startswith(" description: "):
# Set the label text for the description line
subcat_desc = (
line.strip(" description: ")
.strip()
.strip('"')
.strip("\n")
.strip()
)
elif line.startswith("- name:"):
# category
subcat_name = (
line.strip("- name: ")
.strip()
.strip('"')
.strip("\n")
.strip()
)
elif line.startswith(" - "):
# add the package to the packages list
package_name = line.strip(" - ").strip()
# get the package description
# package_desc = obtain_pkg_description(package_name)
package_version = "Unknown"
package_description = "Unknown"
for data in package_metadata:
if data["name"] == package_name:
package_version = data["version"]
package_description = data["description"]
break
if package_description == "Unknown":
package_description = obtain_pkg_description(package_name)
package = Package(
package_name,
package_description,
category_name,
subcat_name,
subcat_desc,
package_version,
)
packages.append(package)
# filter the results so that each category holds a list of package
category_name = None
packages_cat_lst = []
for pkg in packages:
if category_name == pkg.category:
packages_cat_lst.append(pkg)
category_dict[category_name] = packages_cat_lst
elif category_name is None:
packages_cat_lst.append(pkg)
category_dict[pkg.category] = packages_cat_lst
else:
# reset packages, new category
packages_cat_lst = []
packages_cat_lst.append(pkg)
category_dict[pkg.category] = packages_cat_lst
category_name = pkg.category
# Print dictionary for debugging
# for key in category_dict.keys():
# print("Category = %s" % key)
# pkg_list = category_dict[key]
#
# for pkg in pkg_list:
# print("%s" % pkg.name)
sorted_dict = None
sorted_dict = dict(sorted(category_dict.items()))
if sorted_dict is None:
logger.error(
"An error occurred during sort of stored packages in store_packages()"
)
else:
with open(cache, "w", encoding="UTF-8") as f:
for key in category_dict.keys():
pkg_list = category_dict[key]
for pkg in pkg_list:
f.write("%s\n" % pkg.name)
return sorted_dict
except Exception as e:
logger.error("Exception in store_packages() : %s" % e)
sys.exit(1)
def get_package_description(package):
query_str = ["pacman", "-Si", package]
try:
with subprocess.Popen(
query_str,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=1,
universal_newlines=True,
) as process:
while True:
if process.poll() is not None:
break
returncode = None
returncode = process.poll()
if returncode is not None:
for line in process.stdout:
if "Description :" in line.strip():
return line.replace(" ", "").split("Description:")[1].strip()
except Exception as e:
logger.error("Exception in get_package_description(): %s" % e)
# =====================================================
# PACKAGE VERSIONS
# =====================================================
# get live package name, version info and repo name
# used in store_packages() and get_installed_package_data()
def get_all_package_info():
query_str = ["pacman", "-Si"]
try:
process_pkg_query = subprocess.Popen(
query_str, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
out, err = process_pkg_query.communicate(timeout=process_timeout)
if process_pkg_query.returncode == 0:
if out:
package_data = []
package_name = "Unknown"
package_version = "Unknown"
package_description = "Unknown"
package_repository = "Unknown"
for line in out.decode("utf-8").splitlines():
package_dict = {}
if "Name :" in line.strip():
package_name = line.replace(" ", "").split("Name:")[1].strip()
if "Version :" in line.strip():
package_version = (
line.replace(" ", "").split("Version:")[1].strip()
)
if "Description :" in line.strip():
package_description = line.split("Description :")[1].strip()
if "Repository :" in line.strip():
package_repository = line.split("Repository :")[1].strip()
package_dict["name"] = package_name
package_dict["version"] = package_version
package_dict["description"] = package_description
package_dict["repository"] = package_repository
package_data.append(package_dict)
return package_data
else:
logger.error("Failed to extract package version information.")
except Exception as e:
logger.error("Exception in get_all_package_info() : %s" % e)
# get installed package version, installed date, name to be displayed inside PackageListDialog
# for export and later import
def get_installed_package_data(self):
# to capture the latest package version
latest_package_data = get_all_package_info()
# query_str = ["pacman", "-Qi"]
# query_str = ["pacman", "-Qien"]
try:
installed_packages_list = []
pkg_name = None
pkg_version = None
pkg_install_date = None
pkg_installed_size = None
pkg_latest_version = None
with subprocess.Popen(
self.pacman_export_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=1,
universal_newlines=True,
) as process:
for line in process.stdout:
if "Name :" in line.strip():
pkg_name = line.replace(" ", "").split("Name:")[1].strip()
if "Version :" in line.strip():
pkg_version = line.replace(" ", "").split("Version:")[1].strip()
if "Installed Size :" in line.strip():
pkg_installed_size = line.split("Installed Size :")[1].strip()
if "Install Date :" in line.strip():
pkg_install_date = line.split("Install Date :")[1].strip()
# get the latest version lookup dictionary
found = False
pkg_latest_version = None
for i in latest_package_data:
if i["name"] == pkg_name:
pkg_latest_version = i["version"]
break
installed_packages_list.append(
(
pkg_name,
pkg_version,
pkg_latest_version,
pkg_installed_size,
pkg_install_date,
)
)
self.pkg_export_queue.put(installed_packages_list)
# return installed_packages_list
except Exception as e:
logger.error("Exception in get_installed_package_data() : %s" % e)
# get list of files installed by a package
def get_package_files(package_name):
try:
query_str = ["pacman", "-Fl", package_name]
process = subprocess.run(
query_str,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
timeout=process_timeout,
)
if process.returncode == 0:
package_files = []
for line in process.stdout.decode("utf-8").splitlines():
package_files.append((line.split(" ")[1], None))
return package_files
else:
return None
except Exception as e:
logger.error("Exception in get_package_files(): %s" % e)
# get key package information which is to be shown inside ProgressDialog
def get_package_information(package_name):
logger.info("Fetching package information for %s" % package_name)
try:
pkg_name = "Unknown"
pkg_version = "Unknown"
pkg_repository = "Unknown / pacman mirrorlist not configured"
pkg_description = "Unknown"
pkg_arch = "Unknown"
pkg_url = "Unknown"
pkg_depends_on = []
pkg_conflicts_with = []
pkg_download_size = "Unknown"
pkg_installed_size = "Unknown"
pkg_build_date = "Unknown"
pkg_packager = "Unknown"
package_metadata = {}
query_local_cmd = ["pacman", "-Qi", package_name]
query_remote_cmd = ["pacman", "-Si", package_name]
process_query_remote = subprocess.run(
query_remote_cmd,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
timeout=process_timeout,
)
# added validation on process result
if process_query_remote.returncode == 0:
for line in process_query_remote.stdout.decode("utf-8").splitlines():
if "Name :" in line.strip():
pkg_name = line.replace(" ", "").split("Name:")[1].strip()
if "Version :" in line.strip():
pkg_version = line.replace(" ", "").split("Version:")[1].strip()
if "Repository :" in line.strip():
pkg_repository = line.split("Repository :")[1].strip()
if "Description :" in line.strip():
pkg_description = line.split("Description :")[1].strip()
if "Architecture :" in line.strip():
pkg_arch = line.split("Architecture :")[1].strip()
if "URL :" in line.strip():
pkg_url = line.split("URL :")[1].strip()
if "Depends On :" in line.strip():
if line.split("Depends On :")[1].strip() != "None":
pkg_depends_on_str = line.split("Depends On :")[1].strip()
for pkg_dep in pkg_depends_on_str.split(" "):
pkg_depends_on.append((pkg_dep, None))
else:
pkg_depends_on = []
if "Conflicts With :" in line.strip():
if line.split("Conflicts With :")[1].strip() != "None":
pkg_conflicts_with_str = line.split("Conflicts With :")[
1
].strip()
for pkg_con in pkg_conflicts_with_str.split(" "):
pkg_conflicts_with.append((pkg_con, None))
else:
pkg_conflicts_with = []
if "Download Size :" in line.strip():
pkg_download_size = line.split("Download Size :")[1].strip()
if "Installed Size :" in line.strip():
pkg_installed_size = line.split("Installed Size :")[1].strip()
if "Build Date :" in line.strip():
pkg_build_date = line.split("Build Date :")[1].strip()
if "Packager :" in line.strip():
pkg_packager = line.split("Packager :")[1].strip()
package_metadata["name"] = pkg_name
package_metadata["version"] = pkg_version
package_metadata["repository"] = pkg_repository
package_metadata["description"] = pkg_description
package_metadata["arch"] = pkg_arch
package_metadata["url"] = pkg_url
package_metadata["depends_on"] = pkg_depends_on
package_metadata["conflicts_with"] = pkg_conflicts_with
package_metadata["download_size"] = pkg_download_size
package_metadata["installed_size"] = pkg_installed_size
package_metadata["build_date"] = pkg_build_date
package_metadata["packager"] = pkg_packager
return package_metadata
elif (
"error: package '%s' was not found\n" % package_name
in process_query_remote.stdout.decode("utf-8")
):
return "error: package '%s' was not found" % package_name
else:
process_query_local = subprocess.run(
query_local_cmd,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
timeout=process_timeout,
)
# added validation on process result
if process_query_local.returncode == 0:
for line in process_query_local.stdout.decode("utf-8").splitlines():
if "Name :" in line.strip():
pkg_name = line.replace(" ", "").split("Name:")[1].strip()
if "Version :" in line.strip():
pkg_version = line.replace(" ", "").split("Version:")[1].strip()
if "Repository :" in line.strip():
pkg_repository = line.split("Repository :")[1].strip()
if "Description :" in line.strip():
pkg_description = line.split("Description :")[1].strip()
if "Architecture :" in line.strip():
pkg_arch = line.split("Architecture :")[1].strip()
if "URL :" in line.strip():
pkg_url = line.split("URL :")[1].strip()
if "Depends On :" in line.strip():
if line.split("Depends On :")[1].strip() != "None":
pkg_depends_on_str = line.split("Depends On :")[
1
].strip()
for pkg_dep in pkg_depends_on_str.split(" "):
pkg_depends_on.append((pkg_dep, None))
else:
pkg_depends_on = []
if "Conflicts With :" in line.strip():
if line.split("Conflicts With :")[1].strip() != "None":
pkg_conflicts_with_str = line.split("Conflicts With :")[
1
].strip()
for pkg_con in pkg_conflicts_with_str.split(" "):
pkg_conflicts_with.append((pkg_con, None))
else:
pkg_conflicts_with = []
if "Download Size :" in line.strip():
pkg_download_size = line.split("Download Size :")[1].strip()
if "Installed Size :" in line.strip():
pkg_installed_size = line.split("Installed Size :")[1].strip()
if "Build Date :" in line.strip():
pkg_build_date = line.split("Build Date :")[1].strip()
if "Packager :" in line.strip():
pkg_packager = line.split("Packager :")[1].strip()
package_metadata["name"] = pkg_name
package_metadata["version"] = pkg_version
package_metadata["repository"] = pkg_repository
package_metadata["description"] = pkg_description
package_metadata["arch"] = pkg_arch
package_metadata["url"] = pkg_url
package_metadata["depends_on"] = pkg_depends_on
package_metadata["conflicts_with"] = pkg_conflicts_with
package_metadata["download_size"] = pkg_download_size
package_metadata["installed_size"] = pkg_installed_size
package_metadata["build_date"] = pkg_build_date
package_metadata["packager"] = pkg_packager
return package_metadata
else:
return None
except Exception as e:
logger.error("Exception in get_package_information(): %s" % e)
# =====================================================
# APP QUERY
# =====================================================
def get_current_installed():
logger.debug("Get currently installed packages")
path = base_dir + "/cache/installed.lst"
# query_str = "pacman -Q > " + path
query_str = ["pacman", "-Q"]
# run the query - using Popen because it actually suits this use case a bit better.
subprocess_query = subprocess.Popen(
query_str,
shell=False,
stdout=subprocess.PIPE,
)
out, err = subprocess_query.communicate(timeout=process_timeout)
# added validation on process result
if subprocess_query.returncode == 0:
file = open(path, "w")
for line in out.decode("utf-8"):
file.write(line)
file.close()
else:
logger.warning("Failed to run %s" % query_str)
def query_pkg(package):
try:
package = package.strip()
path = base_dir + "/cache/installed.lst"
pacman_localdb = base_dir + "/cache/pacman-localdb"
if os.path.exists(path):
if is_file_stale(path, 0, 0, 30):
get_current_installed()
# file does NOT exist;
else:
get_current_installed()
# then, open the resulting list in read mode
with open(path, "r") as f:
# first we need to strip the new line escape sequence to ensure we don't get incorrect outcome
pkg = package.strip("\n")
# If the pkg name appears in the list, then it is installed
for line in f:
installed = line.split(" ")
# We only compare against the name of the package, NOT the version number.
if pkg == installed[0]:
# file.close()
return True
# We will only hit here, if the pkg does not match anything in the file.
# file.close()
return False
except Exception as e:
logger.error("Exception in query_pkg(): %s " % e)
# =====================================================
# PACKAGE DESCRIPTION CACHE AND SEARCH
# =====================================================
def cache(package, path_dir_cache):
try:
# first we need to strip the new line escape sequence to ensure we don't get incorrect outcome
pkg = package.strip()
# create the query
query_str = ["pacman", "-Si", pkg, " --noconfirm"]
# run the query - using Popen because it actually suits this use case a bit better.
process = subprocess.Popen(
query_str, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
out, err = process.communicate()
# validate the process result
if process.returncode == 0:
# out, err = process.communicate()
output = out.decode("utf-8")
if len(output) > 0:
split = output.splitlines()
# Currently the output of the pacman command above always puts the description on the 4th line.
desc = str(split[3])
# Ok, so this is a little fancy: there is formatting from the output which we wish to ignore (ends at 19th character)
# and there is a remenant of it as the last character - usually a single or double quotation mark, which we also need to ignore
description = desc[18:]
# writing to a caching file with filename matching the package name
filename = path_dir_cache + pkg
file = open(filename, "w")
file.write(description)
file.close()
return description
# There are several packages that do not return a valid process return code
# Cathing those manually via corrections folder
if process.returncode != 0:
exceptions = [
"florence",
"mintstick-bin",
"ttf-hack",
"ttf-roboto-mono",
"aisleriot",
"mailspring",
"linux-rt",
"linux-rt-headers",
"linux-rt-lts",
"linux-rt-lts-headers",
"kodi-x11",
"kodi-addons",
"sardi-icons",
]
if pkg in exceptions:
description = file_lookup(pkg, path_dir_cache + "corrections/")
return description
return "No Description Found"
except Exception as e:
logger.error("Exception in cache(): %s " % e)
# Creating an over-load so that we can use the same function, with slightly different code to get the results we need
def cache_btn():
# fraction = 1 / len(packages)
# Non Multithreaded version.
packages.sort()
number = 1
for pkg in packages:
logger.debug(str(number) + "/" + str(len(packages)) + ": Caching " + pkg)
cache(pkg, path_dir_cache)
number = number + 1
# progressbar.timeout_id = GLib.timeout_add(50, progressbar.update, fraction)
logger.debug("Caching applications finished")
# This will need to be coded to be running multiple processes eventually, since it will be manually invoked.
# process the file list
# for each file in the list, open the file
# process the file ignoring what is not what we need
# for each file line processed, we need to invoke the cache function that is not over-ridden.
def file_lookup(package, path):
# first we need to strip the new line escape sequence to ensure we don't get incorrect outcome
pkg = package.strip("\n")
output = ""
if os.path.exists(path + "corrections/" + pkg):
filename = path + "corrections/" + pkg
else:
filename = path + pkg
file = open(filename, "r")
output = file.read()
file.close()
if len(output) > 0:
return output
return "No Description Found"
def obtain_pkg_description(package):
# This is a pretty simple function now, decide how to get the information, then get it.
# processing variables.
output = ""
path = base_dir + "/cache/"
# First we need to determine whether to pull from cache or pacman.
if os.path.exists(path + package.strip("\n")):
output = file_lookup(package, path)
# file doesn't exist, so create a blank copy
else:
output = cache(package, path)
# Add the package in question to the global variable, in case recache is needed
packages.append(package)
return output
def restart_program():
os.unlink("/tmp/blackbox.lock")
python = sys.executable
os.execl(python, python, *sys.argv)
# =====================================================
# MONITOR PACMAN LOG FILE
# =====================================================
# write lines from the pacman log onto a queue, this is called from a non-blocking thread
def add_pacmanlog_queue(self):
try:
lines = []
with open(pacman_logfile, "r", encoding="utf-8") as f:
while True:
line = f.readline()
if line:
lines.append(line.encode("utf-8"))
self.pacmanlog_queue.put(lines)
else:
time.sleep(0.5)
except Exception as e:
logger.error("Exception in add_pacmanlog_queue() : %s" % e)
finally:
logger.debug("No new lines found inside the pacman log file")
# start log timer to update the textview called from a non-blocking thread
def start_log_timer(self, window_pacmanlog):
while True:
if window_pacmanlog.start_logtimer is False:
logger.debug("Stopping Pacman log monitoring timer")
return False
GLib.idle_add(update_textview_pacmanlog, self, priority=GLib.PRIORITY_DEFAULT)
time.sleep(2)
# update the textview component with new lines from the pacman log file
# To fix: Gtk-CRITICAL **: gtk_text_buffer_emit_insert: assertion 'g_utf8_validate (text, len, NULL)' failed
# Make sure the line read from the pacman log file is encoded in utf-8
# Then decode the line when inserting inside the buffer
def update_textview_pacmanlog(self):
lines = self.pacmanlog_queue.get()
try:
buffer = self.textbuffer_pacmanlog
if len(lines) > 0:
end_iter = buffer.get_end_iter()
for line in lines:
if len(line) > 0:
buffer.insert(
end_iter,
line.decode("utf-8"),
len(line),
)
except Exception as e:
logger.error("Exception in update_textview_pacmanlog() : %s" % e)
finally:
self.pacmanlog_queue.task_done()
if len(lines) > 0:
text_mark_end = buffer.create_mark("end", buffer.get_end_iter(), False)
# auto-scroll the textview to the bottom as new content is added
self.textview_pacmanlog.scroll_mark_onscreen(text_mark_end)
lines.clear()
# =====================================================
# USER SEARCH
# =====================================================
def search(self, term):
try:
logger.info('Searching for: "%s"' % term)
pkg_matches = []
category_dict = {}
whitespace = False
if term.strip():
whitespace = True
for pkg_list in self.packages.values():
for pkg in pkg_list:
if whitespace:
for te in term.split(" "):
if (
te.lower() in pkg.name.lower()
or te.lower() in pkg.description.lower()
):
# only unique name matches
if pkg not in pkg_matches:
pkg_matches.append(
pkg,
)
else:
if (
term.lower() in pkg.name.lower()
or term.lower() in pkg.description.lower()
):
pkg_matches.append(
pkg,
)
# filter the results so that each category holds a list of package
category_name = None
packages_cat = []
for pkg_match in pkg_matches:
if category_name == pkg_match.category:
packages_cat.append(pkg_match)
category_dict[category_name] = packages_cat
elif category_name is None:
packages_cat.append(pkg_match)
category_dict[pkg_match.category] = packages_cat
else:
# reset packages, new category
packages_cat = []
packages_cat.append(pkg_match)
category_dict[pkg_match.category] = packages_cat
category_name = pkg_match.category
# debug console output to display package info
"""
# print out number of results found from each category
print("[DEBUG] %s Search results.." % datetime.now().strftime("%H:%M:%S"))
for category in sorted(category_dict):
category_res_len = len(category_dict[category])
print("[DEBUG] %s %s = %s" %(
datetime.now().strftime("%H:%M:%S"),
category,
category_res_len,
)
)
"""
# sort dictionary so the category names are displayed in alphabetical order
sorted_dict = None
if len(category_dict) > 0:
sorted_dict = dict(sorted(category_dict.items()))
self.search_queue.put(
sorted_dict,
)
else:
self.search_queue.put(
None,
)
except Exception as e:
logger.error("Exception in search(): %s", e)
# =====================================================
# ARCOLINUX REPOS, KEYS AND MIRRORS
# =====================================================
def append_repo(text):
"""Append a new repo"""
try:
with open(pacman_conf, "a", encoding="utf-8") as f:
f.write("\n\n")
f.write(text)
except Exception as e:
logger.error("Exception in append_repo(): %s" % e)
def repo_exist(value):
"""check repo_exists"""
with open(pacman_conf, "r", encoding="utf-8") as f:
lines = f.readlines()
f.close()
for line in lines:
if value in line:
return True
return False
def install_snigdhaos_keyring():
try:
keyring = base_dir + "/packages/snigdhaos-keyring/"
file = os.listdir(keyring)
cmd_str = [
"pacman",
"-U",
keyring + str(file).strip("[]'"),
"--noconfirm",
]
logger.debug("%s" % " ".join(cmd_str))
with subprocess.Popen(
cmd_str,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=1,
universal_newlines=True,
) as process:
process.wait(process_timeout)
output = []
for line in process.stdout:
output.append(line)
if process.returncode == 0:
return 0
else:
if len(output) == 0:
output.append("Error: install of Snigdha OS keyring failed")
logger.error(" ".join(output))
result_err = {}
result_err["cmd_str"] = cmd_str
result_err["output"] = output
return result_err
except Exception as e:
logger.error("Exception in install_snigdhaos_keyring(): %s" % e)
result_err = {}
result_err["cmd_str"] = cmd_str
result_err["output"] = e
return result_err
def remove_snigdhaos_keyring():
try:
cmd_str = ["pacman", "-Rdd", "snigdhaos-keyring", "--noconfirm"]
with subprocess.Popen(
cmd_str,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=1,
universal_newlines=True,
) as process:
process.wait(process_timeout)
output = []
for line in process.stdout:
output.append(line)
if process.returncode == 0:
return 0
else:
if len(output) == 0:
output.append("Error: removal of Snigdha OS keyring failed")
logger.error(" ".join(output))
result_err = {}
result_err["cmd_str"] = cmd_str
result_err["output"] = output
return result_err
except Exception as e:
logger.error("Exception in remove_snigdhaos_keyring(): %s" % e)
result_err = {}
result_err["cmd_str"] = cmd_str
result_err["output"] = e
return result_err
def install_snigdhaos_mirrorlist():
try:
mirrorlist = base_dir + "/packages/snigdhaos-mirrorlist/"
file = os.listdir(mirrorlist)
cmd_str = [
"pacman",
"-U",
mirrorlist + str(file).strip("[]'"),
"--noconfirm",
]
logger.debug("%s" % " ".join(cmd_str))
with subprocess.Popen(
cmd_str,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=1,
universal_newlines=True,
) as process:
process.wait(process_timeout)
output = []
for line in process.stdout:
output.append(line)
if process.returncode == 0:
return 0
else:
if len(output) == 0:
output.append("Error: install of Snigdha OS mirrorlist failed")
logger.error(" ".join(output))
result_err = {}
result_err["cmd_str"] = cmd_str
result_err["output"] = output
return result_err
except Exception as e:
logger.error("Exception in install_snigdhaos_mirrorlist(): %s" % e)
result_err = {}
result_err["cmd_str"] = cmd_str
result_err["output"] = output
return result_err
def remove_snigdhaos_mirrorlist():
try:
cmd_str = ["pacman", "-Rdd", "snigdhaos-mirrorlist-git", "--noconfirm"]
logger.debug("%s" % " ".join(cmd_str))
with subprocess.Popen(
cmd_str,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=1,
universal_newlines=True,
) as process:
process.wait(process_timeout)
output = []
for line in process.stdout:
output.append(line)
if process.returncode == 0:
return 0
else:
if len(output) == 0:
output.append("Error: removal of Snigdha OS mirrorlist failed")
logger.error(" ".join(output))
result_err = {}
result_err["cmd_str"] = cmd_str
result_err["output"] = output
return result_err
except Exception as e:
logger.error("Exception in remove_snigdhaos_mirrorlist(): %s" % e)
result_err = {}
result_err["cmd_str"] = cmd_str
result_err["output"] = e
return result_err
def add_snigdhaos_repos():
logger.info("Adding Snigdha OS repos on %s" % distr)
try:
# first check if snigdhaos repos are already inside pacman conf file
if verify_snigdhaos_pacman_conf() is False:
# take backup of existing pacman.conf file
if os.path.exists(pacman_conf):
shutil.copy(pacman_conf, pacman_conf_backup)
# read existing contents from pacman.conf file
logger.info("Reading from %s" % pacman_conf)
lines = []
with open(pacman_conf, "r", encoding="utf-8") as r:
lines = r.readlines()
# check for existing Snigdha OS entries
if len(lines) > 0:
snigdhaos_core_found = False
snigdhaos_extra_found = False
for line in lines:
if "#" in line.strip():
if snigdhaos_core[0].replace("#", "") in line.strip():
snigdhaos_core_found = True
index = lines.index(line)
del lines[index]
lines.insert(index, snigdhaos_core[0])
index += 1
del lines[index]
lines.insert(index, snigdhaos_core[1])
index += 1
del lines[index]
lines.insert(index, snigdhaos_core[2])
if snigdhaos_extra[0].replace("#", "") in line.strip():
snigdhaos_extra_found = True
index = lines.index(line)
del lines[index]
lines.insert(index, snigdhaos_extra[0])
index += 1
del lines[index]
lines.insert(index, snigdhaos_extra[1])
index += 1
del lines[index]
lines.insert(index, snigdhaos_extra[2])
if line.strip() == snigdhaos_core[0]:
snigdhaos_core_found = True
if line.strip() == snigdhaos_extra[0]:
snigdhaos_extra_found = True
if snigdhaos_core_found is False:
lines.append("\n")
for snigdhaos_core_line in snigdhaos_core:
lines.append(snigdhaos_core_line)
if snigdhaos_extra_found is False:
lines.append("\n")
for snigdhaos_extra_line in snigdhaos_extra:
lines.append(snigdhaos_extra_line)
logger.info("[Add Snigdha OS repos] Writing to %s" % pacman_conf)
if len(lines) > 0:
with open(pacman_conf, "w", encoding="utf-8") as w:
for l in lines:
w.write(l.strip() + "\n")
w.flush()
return 0
else:
logger.error("Failed to process %s" % pacman_conf)
else:
logger.error("Failed to read %s" % pacman_conf)
else:
logger.info("Snigdha OS repos already setup inside pacman conf file")
return 0
except Exception as e:
logger.error("Exception in add_snigdhaos_repos(): %s" % e)
return e
def remove_snigdhaos_repos():
# remove the Snigdha OS repos in /etc/pacman.conf
try:
# check for existing Snigdha OS entries and remove
if verify_snigdhaos_pacman_conf() is True:
if os.path.exists(pacman_conf):
shutil.copy(pacman_conf, pacman_conf_backup)
logger.info("Reading from %s" % pacman_conf)
lines = []
with open(pacman_conf, "r", encoding="utf-8") as r:
lines = r.readlines()
if len(lines) > 0:
index = 0
for line in lines:
if "%s\n" % snigdhaos_core[0] == line:
index = lines.index("%s\n" % snigdhaos_core[0])
if index > 0:
if distr != "snigdhaos":
del lines[index]
del lines[index]
del lines[index]
else:
lines[index] = "#%s\n" % snigdhaos_core[0]
lines[index + 1] = "#%s\n" % snigdhaos_core[1]
lines[index + 2] = "#%s\n" % snigdhaos_core[2]
elif (
"#" in line.strip()
and snigdhaos_core[0] == line.replace("#", "").strip()
and distr != "snigdhaos"
):
# check if already commented
index = lines.index(line)
del lines[index]
del lines[index]
del lines[index]
if "%s\n" % snigdhaos_extra[0] == line:
index = lines.index("%s\n" % snigdhaos_extra[0])
if index > 0:
if distr != "snigdhaos":
del lines[index]
del lines[index]
del lines[index]
else:
lines[index] = "#%s\n" % snigdhaos_extra[0]
lines[index + 1] = "#%s\n" % snigdhaos_extra[1]
lines[index + 2] = "#%s\n" % snigdhaos_extra[2]
elif (
"#" in line.strip()
and snigdhaos_extra[0] == line.replace("#", "").strip()
and distr != "snigdhaos"
):
# check if already commented
index = lines.index(line)
del lines[index]
del lines[index]
del lines[index]
# remove any white spaces from end of the file only if on non snigdhaos system
# on any non snigdhaos distro lines are deleted which leaves empty lines in the file
# causing the file to grow in size
if distr != "snigdhaos":
if lines[-1] == "\n":
del lines[-1]
if lines[-2] == "\n":
del lines[-2]
if lines[-3] == "\n":
del lines[-3]
if lines[-4] == "\n":
del lines[-4]
logger.info("[Remove Snigdha OS Repos] Writing to %s" % pacman_conf)
if len(lines) > 0:
with open(pacman_conf, "w") as w:
w.writelines(lines)
w.flush()
return 0
else:
logger.error("Failed to process %s" % pacman_conf)
else:
logger.error("Failed to read %s" % pacman_conf)
else:
logger.info("No Snigdha OS repos setup inside pacman conf file")
return 0
except Exception as e:
logger.error("Exception in remove_snigdhaos_repos(): %s" % e)
return e
# check if pacman.conf has snigdhaos repos setup
def verify_snigdhaos_pacman_conf():
try:
lines = None
snigdhaos_core_setup = False
snigdhaos_extra_setup = False
with open(pacman_conf, "r") as r:
lines = r.readlines()
if lines is not None:
for line in lines:
if snigdhaos_core[0] in line.strip():
if "#" not in line.strip():
snigdhaos_core_setup = True
else:
return False
if snigdhaos_extra[0] in line.strip():
if "#" not in line.strip():
snigdhaos_extra_setup = True
else:
return False
if (
snigdhaos_core_setup is True
and snigdhaos_extra_setup is True
):
return True
else:
return False
except Exception as e:
logger.error("Exception in check_snigdhaos_pacman(): %s" % e)
# =====================================================
# CHECK IF PACKAGE IS INSTALLED
# =====================================================
# check if package is installed or not
def check_package_installed(package_name):
# query_str = ["pacman", "-Qi", package]
query_str = ["pacman", "-Qq"]
try:
process_pkg_installed = subprocess.run(
query_str,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
timeout=process_timeout,
universal_newlines=True,
)
if package_name in process_pkg_installed.stdout.splitlines():
return True
else:
# check if the package is in the local pacman db
if check_pacman_localdb(package_name):
return True
else:
return False
except subprocess.CalledProcessError:
# package is not installed
return False
# =====================================================
# QUERY THE LOCAL PACMAN DB FOR PACKAGE
# =====================================================
# This is used to validate a package install/uninstall
# check if package is installed or not
def check_pacman_localdb(package_name):
query_str = ["pacman", "-Qi", package_name]
try:
process_pkg_installed = subprocess.run(
query_str,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
timeout=process_timeout,
)
if process_pkg_installed.returncode == 0:
for line in process_pkg_installed.stdout.decode("utf-8").splitlines():
if line.startswith("Name :"):
if line.replace(" ", "").split("Name:")[1].strip() == package_name:
return True
if line.startswith("Replaces :"):
replaces = line.split("Replaces :")[1].strip()
if len(replaces) > 0:
if package_name in replaces:
return True
else:
return False
except subprocess.CalledProcessError:
# package is not installed
return False
# =====================================================
# CHECK RUNNING PROCESS
# =====================================================
def check_if_process_running(process_name):
for proc in psutil.process_iter():
try:
pinfo = proc.as_dict(attrs=["pid", "name", "create_time"])
if process_name == pinfo["pid"]:
return True
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return False
# =====================================================
# NOTIFICATIONS
# =====================================================
def show_in_app_notification(self, message, err):
if self.timeout_id is not None:
GLib.source_remove(self.timeout_id)
self.timeout_id = None
if err is True:
self.notification_label.set_markup(
'<span background="yellow" foreground="black">' + message + "</span>"
)
else:
self.notification_label.set_markup(
'<span foreground="white">' + message + "</span>"
)
self.notification_revealer.set_reveal_child(True)
self.timeout_id = GLib.timeout_add(3000, timeout, self)
def timeout(self):
close_in_app_notification(self)
def close_in_app_notification(self):
self.notification_revealer.set_reveal_child(False)
GLib.source_remove(self.timeout_id)
self.timeout_id = None
def reveal_infobar(self, progress_dialog):
progress_dialog.infobar.set_revealed(True)
progress_dialog.infobar.show_all()
GLib.source_remove(self.timeout_id)
self.timeout_id = None
"""
Since the app could be quit/terminated at any time during a pacman transaction.
The pacman process spawned by the install/uninstall threads, needs to be terminated too.
Otherwise the app may hang waiting for pacman to complete its transaction.
"""
# =====================================================
# PACMAN
# =====================================================
def terminate_pacman():
try:
process_found = False
for proc in psutil.process_iter():
try:
pinfo = proc.as_dict(attrs=["pid", "name", "create_time"])
if pinfo["name"] == "pacman":
process_found = True
logger.debug("Killing pacman process = %s" % pinfo["name"])
proc.kill()
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
if process_found is True:
if check_pacman_lockfile():
os.unlink(pacman_lockfile)
except Exception as e:
logger.error("Exception in terminate_pacman() : %s" % e)
def is_thread_alive(thread_name):
for thread in threading.enumerate():
if thread.name == thread_name and thread.is_alive():
return True
return False
def print_running_threads():
threads_alive = []
for thread in threading.enumerate():
if thread.is_alive():
threads_alive.append(thread.name)
for th in threads_alive:
logger.debug("Thread = %s status = alive" % th)
# this keeps monitoring for items on the package holding queue
# items are added to the queue if a package install is stuck behind another pacman transaction
def check_holding_queue(self):
while True:
(
package,
action,
widget,
cmd_str,
progress_dialog,
) = self.pkg_holding_queue.get()
try:
# logger.debug("Enqueued package = %s" % package.name)
while check_pacman_lockfile() is True:
# logger.debug("Pacman is processing a transaction")
time.sleep(0.2)
th_subprocess = Thread(
name="thread_subprocess",
target=start_subprocess,
args=(
self,
cmd_str,
progress_dialog,
action,
package,
widget,
),
daemon=True,
)
th_subprocess.start()
finally:
self.pkg_holding_queue.task_done()
# check if pacman lock file exists
def check_pacman_lockfile():
try:
if os.path.exists(pacman_lockfile):
# logger.debug("Pacman lockfile found inside %s" % pacman_lockfile)
# logger.debug("Another pacman process is running")
return True
else:
# logger.info("No pacman lockfile found, OK to proceed")
return False
except Exception as e:
logger.error("Exception in check_pacman_lockfile() : %s" % e)
# this gets info on the pacman process currently running
def get_pacman_process():
try:
for proc in psutil.process_iter():
try:
pinfo = proc.as_dict(attrs=["pid", "name", "create_time"])
if pinfo["name"] == "pacman":
return " ".join(proc.cmdline())
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
except Exception as e:
logger.error("Exception in get_pacman_process() : %s" % e)
# used inside PackageImportDialog to display package installation progress
def update_package_import_textview(self, line):
try:
if len(line) > 0:
self.msg_buffer.insert(
self.msg_buffer.get_end_iter(),
" %s" % line,
len(" %s" % line),
)
except Exception as e:
logger.error("Exception in update_progress_textview(): %s" % e)
finally:
self.pkg_import_queue.task_done()
text_mark_end = self.msg_buffer.create_mark(
"end", self.msg_buffer.get_end_iter(), False
)
# scroll to the end of the textview
self.textview.scroll_mark_onscreen(text_mark_end)
def monitor_package_import(self):
while True:
if self.stop_thread is True:
break
message = self.pkg_import_queue.get()
GLib.idle_add(
update_package_import_textview,
self,
message,
priority=GLib.PRIORITY_DEFAULT,
)
# time.sleep(0.2)
# update the package install status label called from outside the main thread
def update_package_status_label(label, text):
label.set_markup(text)
def import_packages(self):
try:
packages_status_list = []
package_failed = False
package_err = {}
count = 0
# clean pacman cache
if os.path.exists(pacman_cache_dir):
query_pacman_clean_cache_str = ["pacman", "-Sc", "--noconfirm"]
logger.info("Cleaning Pacman cache directory = %s" % pacman_cache_dir)
event = "%s [INFO]: Cleaning pacman cache\n" % datetime.now().strftime(
"%Y-%m-%d-%H-%M-%S"
)
self.pkg_import_queue.put(event)
GLib.idle_add(
update_package_status_label,
self.label_package_status,
"Status: <b>Cleaning pacman cache</b>",
)
# clean the pacman cache, so we don't run into any invalid/corrupt package errors during install
process_pacman_cc = subprocess.Popen(
query_pacman_clean_cache_str,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
out, err = process_pacman_cc.communicate(timeout=process_timeout)
self.pkg_import_queue.put(out)
if process_pacman_cc.returncode == 0:
logger.info("Pacman cache directory cleaned")
else:
logger.error("Failed to clean Pacman cache directory")
logger.info("Running full system upgrade")
# run full system upgrade, Arch does not allow partial package updates
query_str = ["pacman", "-Syu", "--noconfirm"]
# query_str = ["pacman", "-Qqen"]
logger.info("Running %s" % " ".join(query_str))
event = "%s [INFO]:Running full system upgrade\n" % datetime.now().strftime(
"%Y-%m-%d-%H-%M-%S"
)
self.pkg_import_queue.put(event)
GLib.idle_add(
update_package_status_label,
self.label_package_status,
"Status: <b>Performing full system upgrade - do not power off your system</b>",
)
output = []
with subprocess.Popen(
query_str,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=1,
universal_newlines=True,
) as process:
while True:
if process.poll() is not None:
break
for line in process.stdout:
# print(line.strip())
self.pkg_import_queue.put(line)
output.append(line)
# time.sleep(0.2)
if process.returncode == 0:
logger.info("Pacman system upgrade completed")
GLib.idle_add(
update_package_status_label,
self.label_package_status,
"Status: <b> Full system upgrade - completed</b>",
)
else:
if len(output) > 0:
if "there is nothing to do" not in output:
logger.error("Pacman system upgrade failed")
GLib.idle_add(
update_package_status_label,
self.label_package_status,
"Status: <b> Full system upgrade - failed</b>",
)
print("%s" % " ".join(output))
event = "%s [ERROR]: Installation of packages aborted due to errors\n" % datetime.now().strftime(
"%Y-%m-%d-%H-%M-%S"
)
self.pkg_import_queue.put(event)
logger.error("Installation of packages aborted due to errors")
return
# do not proceed with package installs if system upgrade fails
else:
return
# iterate through list of packages, calling pacman -S on each one
for package in self.packages_list:
process_output = []
package = package.strip()
if len(package) > 0:
if "#" not in package:
query_str = ["pacman", "-S", package, "--needed", "--noconfirm"]
count += 1
logger.info("Running %s" % " ".join(query_str))
event = "%s [INFO]: Running %s\n" % (
datetime.now().strftime("%Y-%m-%d-%H-%M-%S"),
" ".join(query_str),
)
self.pkg_import_queue.put(event)
with subprocess.Popen(
query_str,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=1,
universal_newlines=True,
) as process:
while True:
if process.poll() is not None:
break
for line in process.stdout:
process_output.append(line.strip())
self.pkg_import_queue.put(line)
# time.sleep(0.2)
if process.returncode == 0:
# since this is being run in another thread outside of main, use GLib to update UI component
GLib.idle_add(
update_package_status_label,
self.label_package_status,
"Status: <b>%s</b> -> <b>Installed</b>" % package,
)
GLib.idle_add(
update_package_status_label,
self.label_package_count,
"Progress: <b>%s/%s</b>"
% (count, len(self.packages_list)),
)
packages_status_list.append("%s -> Installed" % package)
else:
logger.error("%s --> Install failed" % package)
GLib.idle_add(
update_package_status_label,
self.label_package_status,
"Status: <b>%s</b> -> <b>Install failed</b>" % package,
)
GLib.idle_add(
update_package_status_label,
self.label_package_count,
"Progress: <b>%s/%s</b>"
% (count, len(self.packages_list)),
)
if len(process_output) > 0:
if "there is nothing to do" not in process_output:
logger.error("%s" % " ".join(process_output))
# store package error in dict
package_err[package] = " ".join(process_output)
package_failed = True
packages_status_list.append("%s -> Failed" % package)
if len(packages_status_list) > 0:
self.pkg_status_queue.put(packages_status_list)
if package_failed is True:
GLib.idle_add(
update_package_status_label,
self.label_package_status,
"<b>Some packages have failed to install see %s</b>" % self.logfile,
)
# end
event = "%s [INFO]: Completed, check the logfile for any errors\n" % (
datetime.now().strftime("%Y-%m-%d-%H-%M-%S"),
)
self.pkg_import_queue.put(event)
except Exception as e:
logger.error("Exception in import_packages(): %s" % e)
finally:
self.pkg_err_queue.put(package_err)
# package install completed now log status to log file
def log_package_status(self):
logger.info("Logging package status")
packages_status_list = None
package_err = None
while True:
try:
time.sleep(0.2)
packages_status_list = self.pkg_status_queue.get()
package_err = self.pkg_err_queue.get()
finally:
self.pkg_status_queue.task_done()
self.pkg_err_queue.task_done()
with open(self.logfile, "w") as f:
f.write(
"# This file was auto-generated by BlackBox on %s at %s\n"
% (
datetime.today().date(),
datetime.now().strftime("%H:%M:%S"),
),
)
if packages_status_list is not None:
for package in packages_status_list:
if package.split("->")[0].strip() in package_err:
f.write("%s\n" % package)
f.write(
"\tERROR: %s\n"
% package_err[package.split("->")[0].strip()]
)
else:
f.write("%s\n" % package)
break
# open blackbox log directory
def open_log_dir():
try:
subprocess.Popen(
["sudo", "-u", sudo_username, "xdg-open", log_dir],
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
except Exception as e:
logger.error("Exception in open_log_dir(): %s" % e)
# ANYTHING UNDER THIS LINE IS CURRENTLY UNUSED!