switched to weighted build-queue
This commit is contained in:
127
utils.go
127
utils.go
@@ -10,6 +10,7 @@ import (
|
||||
paconf "github.com/Morganamilo/go-pacmanconf"
|
||||
"github.com/Morganamilo/go-srcinfo"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"golang.org/x/sync/semaphore"
|
||||
"io"
|
||||
"io/fs"
|
||||
"lukechampine.com/blake3"
|
||||
@@ -17,6 +18,7 @@ import (
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -51,17 +53,11 @@ var (
|
||||
)
|
||||
|
||||
type BuildManager struct {
|
||||
build map[string]chan *ProtoPackage
|
||||
parse chan *ProtoPackage
|
||||
repoPurge map[string]chan []*ProtoPackage
|
||||
repoAdd map[string]chan []*ProtoPackage
|
||||
exit bool
|
||||
buildWG sync.WaitGroup
|
||||
parseWG sync.WaitGroup
|
||||
repoWG sync.WaitGroup
|
||||
buildProcesses []*os.Process
|
||||
buildProcMutex sync.RWMutex
|
||||
alpmMutex sync.RWMutex
|
||||
repoPurge map[string]chan []*ProtoPackage
|
||||
repoAdd map[string]chan []*ProtoPackage
|
||||
repoWG sync.WaitGroup
|
||||
alpmMutex sync.RWMutex
|
||||
sem *semaphore.Weighted
|
||||
}
|
||||
|
||||
type Conf struct {
|
||||
@@ -76,9 +72,10 @@ type Conf struct {
|
||||
ConnectTo string `yaml:"connect_to"`
|
||||
}
|
||||
Build struct {
|
||||
Worker int
|
||||
Makej int
|
||||
Checks bool
|
||||
Worker int
|
||||
Makej int
|
||||
Checks bool
|
||||
SlowQueueThreshold int `yaml:"slow_queue_threshold"`
|
||||
}
|
||||
Logging struct {
|
||||
Level string
|
||||
@@ -161,17 +158,110 @@ func containsSubStr(str string, subList []string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func cleanBuildDir(dir string) error {
|
||||
if _, err := os.Stat(dir); err == nil {
|
||||
func cleanBuildDir(dir string, chrootDir string) error {
|
||||
if stat, err := os.Stat(dir); err == nil && stat.IsDir() {
|
||||
err = os.RemoveAll(dir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if chrootDir != "" {
|
||||
if stat, err := os.Stat(chrootDir); err == nil && stat.IsDir() {
|
||||
err = os.RemoveAll(chrootDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *BuildManager) queue(path string) ([]*ProtoPackage, error) {
|
||||
unsortedQueue, err := genQueue(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error building queue: %w", err)
|
||||
}
|
||||
|
||||
sort.Slice(unsortedQueue, func(i, j int) bool {
|
||||
return unsortedQueue[i].Priority() < unsortedQueue[j].Priority()
|
||||
})
|
||||
|
||||
return unsortedQueue, nil
|
||||
}
|
||||
|
||||
func (b *BuildManager) buildQueue(queue []*ProtoPackage, ctx context.Context) error {
|
||||
for _, pkg := range queue {
|
||||
if err := b.sem.Acquire(ctx, 1); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go func(pkg *ProtoPackage) {
|
||||
defer b.sem.Release(1)
|
||||
dur, err := pkg.build(ctx)
|
||||
if err != nil {
|
||||
log.Warningf("error building package %s->%s->%s in %s: %s", pkg.March, pkg.FullRepo, pkg.Pkgbase, dur, err)
|
||||
b.repoPurge[pkg.FullRepo] <- []*ProtoPackage{pkg}
|
||||
}
|
||||
}(pkg)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func genQueue(path string) ([]*ProtoPackage, error) {
|
||||
pkgBuilds, err := Glob(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error scanning for PKGBUILDs: %w", err)
|
||||
}
|
||||
|
||||
var pkgbuilds []*ProtoPackage
|
||||
for _, pkgbuild := range pkgBuilds {
|
||||
mPkgbuild := PKGBUILD(pkgbuild)
|
||||
if mPkgbuild.FullRepo() == "trunk" || !contains(conf.Repos, mPkgbuild.Repo()) || containsSubStr(mPkgbuild.FullRepo(), conf.Blacklist.Repo) {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, march := range conf.March {
|
||||
dbPkg, dbErr := db.DbPackage.Query().Where(
|
||||
dbpackage.And(
|
||||
dbpackage.Pkgbase(mPkgbuild.PkgBase()),
|
||||
dbpackage.RepositoryEQ(dbpackage.Repository(mPkgbuild.Repo())),
|
||||
dbpackage.March(march),
|
||||
),
|
||||
).Only(context.Background())
|
||||
|
||||
if ent.IsNotFound(dbErr) {
|
||||
log.Debugf("[%s/%s] Package not found in database", mPkgbuild.Repo(), mPkgbuild.PkgBase())
|
||||
} else if err != nil {
|
||||
log.Errorf("[%s/%s] Problem querying db for package: %v", mPkgbuild.Repo(), mPkgbuild.PkgBase(), dbErr)
|
||||
}
|
||||
|
||||
// compare b3sum of PKGBUILD file to hash in database, only proceed if hash differs
|
||||
// reduces the amount of PKGBUILDs that need to be parsed with makepkg, which is _really_ slow, significantly
|
||||
b3s, err := b3sum(pkgbuild)
|
||||
if err != nil {
|
||||
log.Fatalf("Error hashing PKGBUILD: %v", err)
|
||||
}
|
||||
|
||||
if dbPkg != nil && b3s == dbPkg.Hash {
|
||||
log.Debugf("[%s/%s] Skipped: PKGBUILD hash matches db (%s)", mPkgbuild.Repo(), mPkgbuild.PkgBase(), b3s)
|
||||
continue
|
||||
}
|
||||
|
||||
pkgbuilds = append(pkgbuilds, &ProtoPackage{
|
||||
Pkgbuild: pkgbuild,
|
||||
Pkgbase: mPkgbuild.PkgBase(),
|
||||
Repo: dbpackage.Repository(mPkgbuild.Repo()),
|
||||
March: march,
|
||||
FullRepo: mPkgbuild.Repo() + "-" + march,
|
||||
Hash: b3s,
|
||||
})
|
||||
}
|
||||
}
|
||||
return pkgbuilds, nil
|
||||
}
|
||||
|
||||
func movePackagesLive(fullRepo string) error {
|
||||
if _, err := os.Stat(filepath.Join(conf.Basedir.Work, waitingDir, fullRepo)); os.IsNotExist(err) {
|
||||
return nil
|
||||
@@ -576,11 +666,6 @@ func syncMarchs() error {
|
||||
log.Fatalf("Can't generate makepkg for %s: %v", march, err)
|
||||
}
|
||||
|
||||
buildManager.build[march] = make(chan *ProtoPackage, 10000)
|
||||
for i := 0; i < conf.Build.Worker; i++ {
|
||||
go buildManager.buildWorker(i, march)
|
||||
}
|
||||
|
||||
for _, repo := range conf.Repos {
|
||||
fRepo := fmt.Sprintf("%s-%s", repo, march)
|
||||
repos = append(repos, fRepo)
|
||||
|
Reference in New Issue
Block a user