From bb257d06e8e03086bc3702dd542d5cbc96e29549 Mon Sep 17 00:00:00 2001 From: Giovanni Harting <539@idlegandalf.com> Date: Tue, 25 May 2021 20:08:18 +0200 Subject: [PATCH] moved adding to db to main thread In there we can bundle all adds and can shutdown cleaner. Also this fixes split-packages not getting added. --- master.py | 72 ++++++++++++++++++++++++++++++++----------------------- 1 file changed, 42 insertions(+), 30 deletions(-) diff --git a/master.py b/master.py index f64da23..d671b32 100644 --- a/master.py +++ b/master.py @@ -11,7 +11,7 @@ import signal import subprocess import sys import time -from multiprocessing import Pool, current_process, Lock, JoinableQueue +from multiprocessing import Pool, current_process, JoinableQueue, Lock import yaml from humanfriendly import format_timespan @@ -25,7 +25,9 @@ regex_march = re.compile(r"(-march=)(.+?) ", re.MULTILINE) regex_validkeys = re.compile(r"^validpgpkeys\+?=\((.*?)\)", re.MULTILINE | re.DOTALL) fp = None update_last = time.time() -repo_lock = Lock() +copy_l = Lock() +to_add = {} +to_add_l = Lock() def build(pkgbuild, repo) -> None: @@ -77,31 +79,15 @@ def build(pkgbuild, repo) -> None: # copying pkgs.extend(glob.glob("*.pkg.tar.zst.sig")) - for pkg in pkgs: - logging.debug("[%s/%s/%s] Copy %s to %s", process_name, repo, name, pkg, - os.path.join(config["basedir"]["repo"], repo, "os", config["arch"] + "/")) - shutil.copy2(pkg, os.path.join(config["basedir"]["repo"], repo, "os", config["arch"] + "/")) + with copy_l: + for pkg in pkgs: + logging.debug("[%s/%s/%s] Copy %s to %s", process_name, repo, name, pkg, + os.path.join(config["basedir"]["repo"], repo, "os", config["arch"] + "/")) + shutil.copy2(pkg, os.path.join(config["basedir"]["repo"], repo, "os", config["arch"] + "/")) # repo - repo_lock.acquire() - r_res = subprocess.run(["repo-add", "-s", "-v", - os.path.join(config["basedir"]["repo"], repo, "os", config["arch"], - repo + ".db.tar.xz"), - pkgs[0]], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - logging.debug("[REPO-ADD] %s", r_res.stdout.decode()) - if r_res.returncode: - logging.error("[%s/%s/%s] Repo action failed: %s", process_name, repo, name, r_res.stdout.decode()) - repo_lock.release() - return - - p_res = subprocess.run( - ["paccache", "-rc", os.path.join(config["basedir"]["repo"], repo, "os", config["arch"]), "-k", "1"], - stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - logging.debug("[PACCACHE] %s", p_res.stdout.decode()) - repo_lock.release() - if p_res.returncode: - logging.error("[%s/%s/%s] Repo cleanup failed: %s", process_name, repo, name, p_res.stdout.decode()) - return + with to_add_l: + to_add[repo].extend(glob.glob("*.pkg.tar.zst")) logging.info("[%s/%s/%s] Build successful (%s)", process_name, repo, name, format_timespan(time.time() - start_time)) @@ -119,6 +105,28 @@ def run_worker() -> None: os.chdir(sys.path[0]) +def do_repo_work(): + with to_add_l: + for repo in to_add: + if to_add[repo]: + args = ["repo-add", "-s", "-v", + os.path.join(config["basedir"]["repo"], repo, "os", config["arch"], repo + ".db.tar.xz")] + args.extend(to_add[repo]) + r_res = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + logging.debug("[REPO-ADD] %s", r_res.stdout.decode()) + if r_res.returncode: + logging.error("[%s] Repo action failed: %s", repo, r_res.stdout.decode()) + return + + p_res = subprocess.run( + ["paccache", "-rc", os.path.join(config["basedir"]["repo"], repo, "os", config["arch"]), "-k", "1"], + stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + logging.debug("[PACCACHE] %s", p_res.stdout.decode()) + if p_res.returncode: + logging.error("[%s] Repo cleanup failed: %s", repo, p_res.stdout.decode()) + return + + def already_running() -> bool: global fp fp = os.open(f"/tmp/alhp.lock", os.O_WRONLY | os.O_CREAT) @@ -279,6 +287,8 @@ def sync_marchs_with_config() -> None: logging.info("Repos: %s", repo_quota) repos_create = list(set(repo_quota) - set(repos)) repos_delete = list(set(repos) - set(repo_quota)) + for repo in repo_quota: + to_add[repo] = [] for repo in repos_create: logging.debug("Create repo %s: %s", repo, os.path.join(config["basedir"]["repo"], repo, "os/x86_64")) @@ -369,10 +379,12 @@ if __name__ == '__main__': if q.qsize() > 0: logging.info("New Queue size: %d", q.qsize()) else: + do_repo_work() time.sleep(60) except KeyboardInterrupt: - repo_lock.acquire() - pool.close() - pool.terminate() - q.close() - sys.exit(0) + with copy_l: + pool.close() + pool.terminate() + q.close() + do_repo_work() + sys.exit(0)