some refactoring

This commit is contained in:
2021-10-30 08:22:05 +02:00
parent 37233be336
commit 268793f0e5
2 changed files with 388 additions and 390 deletions

View File

@@ -16,6 +16,7 @@ import (
"google.golang.org/protobuf/proto"
"io/ioutil"
"os"
"sync"
"time"
)
@@ -48,6 +49,7 @@ type DemoMatchLoader struct {
dp *DemoParser
parseDemo chan *Demo
parseMap map[string]bool
parseMapL *sync.Mutex
cache *cache.Cache
connecting bool
}
@@ -156,6 +158,7 @@ func (d *DemoMatchLoader) Setup(config *DemoMatchLoaderConfig) error {
d.db = config.Db
d.dp = &DemoParser{}
d.parseMap = map[string]bool{}
d.parseMapL = new(sync.Mutex)
d.cache = config.Cache
err := d.dp.Setup(config.Db, config.Worker)
if err != nil {
@@ -298,15 +301,16 @@ func (d *DemoMatchLoader) requestDemoInfo(matchId uint64, conclusionId uint64, t
}
func (d *DemoMatchLoader) gcWorker(apiKey string, rl ratelimit.Limiter) {
for {
select {
case demo := <-d.parseDemo:
for demo := range d.parseDemo {
d.parseMapL.Lock()
if _, ok := d.parseMap[demo.ShareCode]; ok {
log.Infof("[DL] Skipping %s: parsing in progress", demo.ShareCode)
d.parseMapL.Unlock()
continue
} else {
d.parseMap[demo.ShareCode] = true
}
d.parseMapL.Unlock()
if !d.GCReady {
log.Infof("[DL] Postponing match %d (%s): GC not ready", demo.MatchId, demo.ShareCode)
@@ -465,5 +469,4 @@ func (d *DemoMatchLoader) gcWorker(apiKey string, rl ratelimit.Limiter) {
}
delete(d.parseMap, demo.ShareCode)
}
}
}

View File

@@ -51,7 +51,7 @@ func (p *DemoParser) ParseDemo(demo *Demo) error {
}
}
func (p *DemoParser) downloadReplay(demo *Demo) (io.Reader, error) {
func (p *DemoParser) downloadDecompressReplay(demo *Demo) (io.Reader, error) {
log.Debugf("[DP] Downloading replay for %d", demo.MatchId)
r, err := http.Get(demo.Url)
@@ -63,7 +63,6 @@ func (p *DemoParser) downloadReplay(demo *Demo) (io.Reader, error) {
return nil, DemoNotFoundError{fmt.Errorf("demo not found")}
}
return bzip2.NewReader(r.Body), nil
}
func (p *DemoParser) getDBPlayer(demo *Demo, demoPlayer *common.Player) (*ent.Stats, error) {
@@ -95,9 +94,7 @@ func (p *DemoParser) getMatchPlayerBySteamID(stats []*ent.Stats, steamId uint64)
}
func (p *DemoParser) parseWorker() {
for {
select {
case demo := <-p.demoQueue:
for demo := range p.demoQueue {
if demo.MatchId == 0 {
log.Warningf("[DP] can't parse match %s: no matchid found", demo.ShareCode)
continue
@@ -115,18 +112,17 @@ func (p *DemoParser) parseWorker() {
}
startTime := time.Now()
fDemo, err := p.downloadReplay(demo)
fDemo, err := p.downloadDecompressReplay(demo)
if err != nil {
switch e := err.(type) {
case DemoNotFoundError:
if _, ok := err.(DemoNotFoundError); ok {
if tMatch.Date.Before(time.Now().UTC().AddDate(0, 0, -30)) {
log.Infof("[DP] demo expired for match %d", tMatch.ID)
} else {
log.Infof("[DP] demo 404 not found for match %d. Trying again later.", demo.MatchId)
}
continue
default:
log.Errorf("[DP] Unable to download demo for %d: %v", demo.MatchId, e)
} else {
log.Errorf("[DP] Unable to download demo for %d: %v", demo.MatchId, err)
continue
}
}
@@ -343,5 +339,4 @@ func (p *DemoParser) parseWorker() {
log.Errorf("[DP] Unable close demo file for match %d: %v", demo.MatchId, err)
}
}
}
}