Merge pull request #20 from iconized/master

Trigger #b6e4aea78f129a0d218f70bea64fdfbda018a0e5
This commit is contained in:
Abhiraj Roy (iconized)
2024-04-25 16:38:57 +05:30
committed by GitHub
4 changed files with 615 additions and 9 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,2 @@
__pycache__/ __pycache__/
**.lst **.lst

View File

@@ -85,7 +85,7 @@ def permissions(dst):
subprocess.call(["chown", "-R", sudo_username + ":" + group, dst], shell=False) subprocess.call(["chown", "-R", sudo_username + ":" + group, dst], shell=False)
except Exception as e: except Exception as e:
logger.error( logger.error(
"Exception occured in LOC68: %s" % e "[Exception] permissions() : %s" % e
) )
# NOTE: Creating Log, Export and Config Directory: # NOTE: Creating Log, Export and Config Directory:
@@ -152,7 +152,7 @@ try:
logger.addHandler(ch) logger.addHandler(ch)
logger.addHandler(tfh) logger.addHandler(tfh)
except Exception as e: except Exception as e:
print("[ERROR] Exception in LOC109: %s" % e) print("Found Exception in LOC109: %s" % e)
# NOTE : On app close create package file # NOTE : On app close create package file
def _on_close_create_package_file(): def _on_close_create_package_file():
try: try:
@@ -174,7 +174,7 @@ def _on_close_create_package_file():
for line in process.stdout: for line in process.stdout:
f.write("%s" %line) f.write("%s" %line)
except Exception as e: except Exception as e:
logger.error("[ERROR] Exception in LOC158: %s" % e) logger.error("[Exception] _on_close_create_package_file(): %s" % e)
# NOTE: Global Functions # NOTE: Global Functions
def _get_position(lists, value): def _get_position(lists, value):
@@ -208,7 +208,7 @@ def sync_package_db():
"-Sy", "-Sy",
] ]
logger.info( logger.info(
"Synchronizing Package Database..." "[INFO] Synchronizing Package Database..."
) )
process_sync = subprocess.run( process_sync = subprocess.run(
sync_str, sync_str,
@@ -226,7 +226,7 @@ def sync_package_db():
return out return out
except Exception as e: except Exception as e:
logger.error( logger.error(
"[ERROR] Exception in LOC206: %s" % e "[Exception] sync_package_db() : %s" % e
) )
def sync_file_db(): def sync_file_db():
@@ -236,7 +236,7 @@ def sync_file_db():
"-Fy", "-Fy",
] ]
logger.info( logger.info(
"Synchronizing File Database..." "[INFO] Synchronizing File Database..."
) )
process_sync = subprocess.run( process_sync = subprocess.run(
sync_str, sync_str,
@@ -255,7 +255,7 @@ def sync_file_db():
return out return out
except Exception as e: except Exception as e:
logger.error( logger.error(
"[ERROR] Exception in LOC234: %s" % e "[Exception] sync_file_db() : %s" % e
) )
# NOTE: Installation & Uninstallation Process # NOTE: Installation & Uninstallation Process
@@ -1898,3 +1898,296 @@ def get_pacman_process():
logger.error( logger.error(
"Found Exception in LOC1888: %s" % e "Found Exception in LOC1888: %s" % e
) )
def cache_btn():
# fraction = 1 / len(packages)
packages.sort()
number = 1
for pkg in packages:
logger.debug(str(number) + "/" + str(len(packages)) + ": Caching " + pkg)
cache(pkg, path_dir_cache)
number = number + 1
# progressbar.timeout_id = GLib.timeout_add(50, progressbar.update, fraction)
logger.debug("Caching applications finished")
def restart_program():
os.unlink("/tmp/blackbox.lock")
python = sys.executable
os.execl(python, python, *sys.argv)
def show_in_app_notification(self, message, err):
if self.timeout_id is not None:
GLib.source_remove(self.timeout_id)
self.timeout_id = None
if err is True:
self.notification_label.set_markup(
'<span background="yellow" foreground="black">' + message + "</span>"
)
else:
self.notification_label.set_markup(
'<span foreground="white">' + message + "</span>"
)
self.notification_revealer.set_reveal_child(True)
self.timeout_id = GLib.timeout_add(3000, timeout, self)
def timeout(self):
close_in_app_notification(self)
def close_in_app_notification(self):
self.notification_revealer.set_reveal_child(False)
GLib.source_remove(self.timeout_id)
self.timeout_id = None
def update_package_import_textview(self, line):
try:
if len(line) > 0:
self.msg_buffer.insert(
self.msg_buffer.get_end_iter(),
" %s" % line,
len(" %s" % line),
)
except Exception as e:
logger.error("[Exception] update_progress_textview(): %s" % e)
finally:
self.pkg_import_queue.task_done()
text_mark_end = self.msg_buffer.create_mark(
"end", self.msg_buffer.get_end_iter(), False
)
# scroll to the end of the textview
self.textview.scroll_mark_onscreen(text_mark_end)
def monitor_package_import(self):
while True:
if self.stop_thread is True:
break
message = self.pkg_import_queue.get()
GLib.idle_add(
update_package_import_textview,
self,
message,
priority=GLib.PRIORITY_DEFAULT,
)
def update_package_status_label(label, text):
label.set_markup(text)
def import_packages(self):
try:
packages_status_list = []
package_failed = False
package_err = {}
count = 0
if os.path.exists(pacman_cache_dir):
query_pacman_clean_cache_str = [
"pacman",
"-Sc",
"--noconfirm",
]
logger.info(
"Cleaning Pacman cache directory = %s" % pacman_cache_dir
)
event = "%s [INFO]: Cleaning pacman cache\n" % datetime.now().strftime(
"%Y-%m-%d-%H-%M-%S"
)
self.pkg_import_queue.put(event)
GLib.idle_add(
update_package_status_label,
self.label_package_status,
"Status: <b>Cleaning pacman cache</b>",
)
process_pacman_cc = subprocess.Popen(
query_pacman_clean_cache_str,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
out, err = process_pacman_cc.communicate(timeout=process_timeout)
self.pkg_import_queue.put(out)
if process_pacman_cc.returncode == 0:
logger.info("Pacman cache directory cleaned")
else:
logger.error("Failed to clean Pacman cache directory")
logger.info("Running full system upgrade")
# run full system upgrade, Arch does not allow partial package updates
query_str = ["pacman", "-Syu", "--noconfirm"]
# query_str = ["pacman", "-Qqen"]
logger.info("Running %s" % " ".join(query_str))
event = "%s [INFO]:Running full system upgrade\n" % datetime.now().strftime(
"%Y-%m-%d-%H-%M-%S"
)
self.pkg_import_queue.put(event)
GLib.idle_add(
update_package_status_label,
self.label_package_status,
"Status: <b>Performing full system upgrade - do not power off your system</b>",
)
output = []
with subprocess.Popen(
query_str,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=1,
universal_newlines=True,
) as process:
while True:
if process.poll() is not None:
break
for line in process.stdout:
# print(line.strip())
self.pkg_import_queue.put(line)
output.append(line)
# time.sleep(0.2)
if process.returncode == 0:
logger.info("Pacman system upgrade completed")
GLib.idle_add(
update_package_status_label,
self.label_package_status,
"Status: <b> Full system upgrade - completed</b>",
)
else:
if len(output) > 0:
if "there is nothing to do" not in output:
logger.error("Pacman system upgrade failed")
GLib.idle_add(
update_package_status_label,
self.label_package_status,
"Status: <b> Full system upgrade - failed</b>",
)
print("%s" % " ".join(output))
event = "%s [ERROR]: Installation of packages aborted due to errors\n" % datetime.now().strftime(
"%Y-%m-%d-%H-%M-%S"
)
self.pkg_import_queue.put(event)
logger.error("Installation of packages aborted due to errors")
return
# do not proceed with package installs if system upgrade fails
else:
return
# iterate through list of packages, calling pacman -S on each one
for package in self.packages_list:
process_output = []
package = package.strip()
if len(package) > 0:
if "#" not in package:
query_str = ["pacman", "-S", package, "--needed", "--noconfirm"]
count += 1
logger.info("Running %s" % " ".join(query_str))
event = "%s [INFO]: Running %s\n" % (
datetime.now().strftime("%Y-%m-%d-%H-%M-%S"),
" ".join(query_str),
)
self.pkg_import_queue.put(event)
with subprocess.Popen(
query_str,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=1,
universal_newlines=True,
) as process:
while True:
if process.poll() is not None:
break
for line in process.stdout:
process_output.append(line.strip())
self.pkg_import_queue.put(line)
# time.sleep(0.2)
if process.returncode == 0:
# since this is being run in another thread outside of main, use GLib to update UI component
GLib.idle_add(
update_package_status_label,
self.label_package_status,
"Status: <b>%s</b> -> <b>Installed</b>" % package,
)
GLib.idle_add(
update_package_status_label,
self.label_package_count,
"Progress: <b>%s/%s</b>"
% (count, len(self.packages_list)),
)
packages_status_list.append("%s -> Installed" % package)
else:
logger.error("%s --> Install failed" % package)
GLib.idle_add(
update_package_status_label,
self.label_package_status,
"Status: <b>%s</b> -> <b>Install failed</b>" % package,
)
GLib.idle_add(
update_package_status_label,
self.label_package_count,
"Progress: <b>%s/%s</b>"
% (count, len(self.packages_list)),
)
if len(process_output) > 0:
if "there is nothing to do" not in process_output:
logger.error("%s" % " ".join(process_output))
# store package error in dict
package_err[package] = " ".join(process_output)
package_failed = True
packages_status_list.append("%s -> Failed" % package)
if len(packages_status_list) > 0:
self.pkg_status_queue.put(packages_status_list)
if package_failed is True:
GLib.idle_add(
update_package_status_label,
self.label_package_status,
"<b>Some packages have failed to install see %s</b>" % self.logfile,
)
# end
event = "%s [INFO]: Completed, check the logfile for any errors\n" % (
datetime.now().strftime("%Y-%m-%d-%H-%M-%S"),
)
self.pkg_import_queue.put(event)
except Exception as e:
logger.error("Exception in import_packages(): %s" % e)
finally:
self.pkg_err_queue.put(package_err)
def log_package_status(self):
logger.info("Logging package status")
packages_status_list = None
package_err = None
while True:
try:
time.sleep(0.2)
packages_status_list = self.pkg_status_queue.get()
package_err = self.pkg_err_queue.get()
finally:
self.pkg_status_queue.task_done()
self.pkg_err_queue.task_done()
with open(self.logfile, "w") as f:
f.write(
"# This file was auto-generated by Sofirem on %s at %s\n"
% (
datetime.today().date(),
datetime.now().strftime("%H:%M:%S"),
),
)
if packages_status_list is not None:
for package in packages_status_list:
if package.split("->")[0].strip() in package_err:
f.write("%s\n" % package)
f.write(
"\tERROR: %s\n"
% package_err[package.split("->")[0].strip()]
)
else:
f.write("%s\n" % package)
break
def open_log_dir():
try:
subprocess.Popen(
["sudo", "-u", sudo_username, "xdg-open", log_dir],
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
except Exception as e:
logger.error("Exception in open_log_dir(): %s" % e)

View File

@@ -0,0 +1,300 @@
import os
import sys
import shutil
import psutil
import datetime
# import time
import subprocess
import threading # noqa
import gi
# import configparser
gi.require_version("Gtk", "3.0")
from gi.repository import GLib, Gtk # noqa
log_dir = "/var/log/snigdhaos/"
aai_log_dir = "/var/log/snigdhaos/aai/"
def create_log(self):
print("Making log in /var/log/snigdhaos")
now = datetime.datetime.now()
time = now.strftime("%Y-%m-%d-%H-%M-%S")
destination = aai_log_dir + "aai-log-" + time
command = "sudo pacman -Q > " + destination
subprocess.call(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
def _get_position(lists, value):
data = [string for string in lists if value in string]
position = lists.index(data[0])
return position
def permissions(dst):
try:
groups = subprocess.run(
[
"sh",
"-c",
"id " + sudo_username
],
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
for x in groups.stdout.decode().split(" "):
if "gid" in x:
g = x.split("(")[1]
group = g.replace(")", "").strip()
subprocess.call(
[
"chown",
"-R",
sudo_username + ":" + group,
dst
],
shell=False,
)
except Exception as e:
print(e)
sudo_username = os.getlogin()
home = "/home/" + str(sudo_username)
sddm_default = "/etc/sddm.conf"
sddm_default_original = "/usr/local/share/snigdhaos/sddm/sddm.conf"
sddm_default_d1 = "/etc/sddm.conf"
sddm_default_d2 = "/etc/sddm.conf.d/kde_settings.conf"
sddm_default_d2_dir = "/etc/sddm.conf.d/"
sddm_default_d_sddm_original_1 = "/usr/local/share/snigdhaos/sddm.conf.d/sddm.conf"
sddm_default_d_sddm_original_2 = (
"/usr/local/share/snigdhaos/sddm.conf.d/kde_settings.conf"
)
if os.path.exists("/etc/sddm.conf.d/kde_settings.conf"):
sddm_conf = "/etc/sddm.conf.d/kde_settings.conf"
else:
sddm_conf = "/etc/sddm.conf"
snigdhaos_mirrorlist = "/etc/pacman.d/snigdhaos-mirrorlist"
snigdhaos_mirrorlist_original = "/usr/local/share/snigdhaos/snigdhaos-mirrorlist"
pacman = "/etc/pacman.conf"
neofetch_config = home + "/.config/neofetch/config.conf"
autostart = home + "/.config/autostart/"
srepo = "[snigdhaos-core]\n\
SigLevel = Required DatabaseOptional\n\
Include = /etc/pacman.d/snigdhaos-mirrorlist"
serepo = "[snigdhaos-extra]\n\
SigLevel = Required DatabaseOptional\n\
Include = /etc/pacman.d/snigdhaos-mirrorlist"
def show_in_app_notification(self, message):
if self.timeout_id is not None:
GLib.source_remove(self.timeout_id)
self.timeout_id = None
self.notification_label.set_markup(
'<span foreground="white">' + message + "</span>"
)
self.notification_revealer.set_reveal_child(True)
self.timeout_id = GLib.timeout_add(3000, timeOut, self)
def timeOut(self):
close_in_app_notification(self)
def close_in_app_notification(self):
self.notification_revealer.set_reveal_child(False)
GLib.source_remove(self.timeout_id)
self.timeout_id = None
def test(dst):
for root, dirs, filesr in os.walk(dst):
# print(root)
for folder in dirs:
pass
# print(dst + "/" + folder)
for file in filesr:
pass
# print(dst + "/" + folder + "/" + file)
for file in filesr:
pass
def copy_func(src, dst, isdir=False):
if isdir:
subprocess.run(["cp", "-Rp", src, dst], shell=False)
else:
subprocess.run(["cp", "-p", src, dst], shell=False)
def source_shell(self):
process = subprocess.run(["sh", "-c", 'echo "$SHELL"'], stdout=subprocess.PIPE)
output = process.stdout.decode().strip()
print(output)
if output == "/bin/bash":
subprocess.run(
[
"bash",
"-c",
"su - " + sudo_username + ' -c "source ' + home + '/.bashrc"',
],
stdout=subprocess.PIPE,
)
elif output == "/bin/zsh":
subprocess.run(
["zsh", "-c", "su - " + sudo_username + ' -c "source ' + home + '/.zshrc"'],
stdout=subprocess.PIPE,
)
def run_as_user(script):
subprocess.call(["su - " + sudo_username + " -c " + script], shell=False)
def MessageBox(self, title, message):
md2 = Gtk.MessageDialog(
parent=self,
flags=0,
message_type=Gtk.MessageType.INFO,
buttons=Gtk.ButtonsType.OK,
text=title,
)
md2.format_secondary_markup(message)
md2.run()
md2.destroy()
def rgb_to_hex(rgb):
if "rgb" in rgb:
rgb = rgb.replace("rgb(", "").replace(")", "")
vals = rgb.split(",")
return "#{0:02x}{1:02x}{2:02x}".format(
clamp(int(vals[0])), clamp(int(vals[1])), clamp(int(vals[2]))
)
return rgb
def clamp(x):
return max(0, min(x, 255))
def _get_variable(lists, value):
data = [string for string in lists if value in string]
if len(data) >= 1:
data1 = [string for string in data if "#" in string]
for i in data1:
if i[:4].find("#") != -1:
data.remove(i)
if data:
data_clean = [data[0].strip("\n").replace(" ", "")][0].split("=")
return data_clean
def check_value(list, value):
data = [string for string in list if value in string]
if len(data) >= 1:
data1 = [string for string in data if "#" in string]
for i in data1:
if i[:4].find("#") != -1:
data.remove(i)
return data
def check_backups(now):
if not os.path.exists(home + "/" + bd + "/Backup-" + now.strftime("%Y-%m-%d %H")):
os.makedirs(home + "/" + bd + "/Backup-" + now.strftime("%Y-%m-%d %H"), 0o777)
permissions(home + "/" + bd + "/Backup-" + now.strftime("%Y-%m-%d %H"))
def file_check(file):
if os.path.isfile(file):
return True
return False
def path_check(path):
if os.path.isdir(path):
return True
return False
def gtk_check_value(my_list, value):
data = [string for string in my_list if value in string]
if len(data) >= 1:
data1 = [string for string in data if "#" in string]
for i in data1:
if i[:4].find("#") != -1:
data.remove(i)
return data
def gtk_get_position(my_list, value):
data = [string for string in my_list if value in string]
position = my_list.index(data[0])
return position
def get_shortcuts(conflist):
sortcuts = _get_variable(conflist, "shortcuts")
shortcuts_index = _get_position(conflist, sortcuts[0])
return int(shortcuts_index)
def get_commands(conflist):
commands = _get_variable(conflist, "commands")
commands_index = _get_position(conflist, commands[0])
return int(commands_index)
def check_lightdm_value(list, value):
data = [string for string in list if value in string]
return data
def check_sddm_value(list, value):
data = [string for string in list if value in string]
return data
def hblock_get_state(self):
lines = int(
subprocess.check_output("wc -l /etc/hosts", shell=True).strip().split()[0]
)
if os.path.exists("/usr/local/bin/hblock") and lines > 100:
return True
self.firstrun = False
return False
def do_pulse(data, prog):
prog.pulse()
return True
def copytree(self, src, dst, symlinks=False, ignore=None): # noqa
if not os.path.exists(dst):
os.makedirs(dst)
for item in os.listdir(src):
s = os.path.join(src, item)
d = os.path.join(dst, item)
if os.path.exists(d):
try:
shutil.rmtree(d)
except Exception as e:
print(e)
os.unlink(d)
if os.path.isdir(s):
try:
shutil.copytree(s, d, symlinks, ignore)
except Exception as e:
print(e)
print("ERROR2")
self.ecode = 1
else:
try:
shutil.copy2(s, d)
except: # noqa
print("ERROR3")
self.ecode = 1
def checkIfProcessRunning(processName):
for proc in psutil.process_iter():
try:
pinfo = proc.as_dict(attrs=["pid", "name", "create_time"])
if processName == pinfo["pid"]:
return True
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return False
def restart_program():
python = sys.executable
os.execl(python, python, *sys.argv)

View File

@@ -1 +1,15 @@
#!/bin/python #!/bin/python
# UI modules
from ui.GUI import GUI
from ui.SplashScreen import SplashScreen
from ui.ProgressBarWindow import ProgressBarWindow
from ui.AppFrameGUI import AppFrameGUI
from ui.AboutDialog import AboutDialog
from ui.MessageDialog import MessageDialog
from ui.PacmanLogWindow import PacmanLogWindow
from ui.PackageListDialog import PackageListDialog
from ui.ProgressDialog import ProgressDialog
# from ui.ISOPackagesWindow import ISOPackagesWindow
from ui.PackageSearchWindow import PackageSearchWindow
from ui.PackagesImportDialog import PackagesImportDialog