diff --git a/config.yaml b/config.yaml index f245ddb..81ae077 100644 --- a/config.yaml +++ b/config.yaml @@ -1,6 +1,9 @@ logging: level: DEBUG +parser: + worker: 6 + steam: username: steamusername api_key: apikey diff --git a/csgo/demo_loader.go b/csgo/demo_loader.go index a31b527..4891c48 100644 --- a/csgo/demo_loader.go +++ b/csgo/demo_loader.go @@ -206,10 +206,13 @@ func (d *DemoMatchLoader) steamEventHandler() { go d.SetPlaying() case *steam.LogOnFailedEvent: log.Warningf("[DL] Steam login denied: %+v", e) - log.Warningf("[DL] Asking for auth code now, please provide on stdin.") - scanner := bufio.NewScanner(os.Stdin) - scanner.Scan() - d.steamLogin.AuthCode = scanner.Text() + switch e.Result { + case steamlang.EResult_AccountLogonDenied: + log.Warningf("[DL] Asking for auth code now, please provide on stdin.") + scanner := bufio.NewScanner(os.Stdin) + scanner.Scan() + d.steamLogin.AuthCode = scanner.Text() + } case *steam.DisconnectedEvent: log.Warningf("Steam disconnected, trying to reconnect...") _, err := d.client.Connect() diff --git a/csgo/demo_parser.go b/csgo/demo_parser.go index 0834daa..171c5c8 100644 --- a/csgo/demo_parser.go +++ b/csgo/demo_parser.go @@ -29,15 +29,21 @@ type Demo struct { type DemoParser struct { demoQueue chan *Demo tempDir string + db *ent.Client + lock *sync.RWMutex } type DemoNotFoundError struct { error } -func (p *DemoParser) Setup(db *ent.Client, lock *sync.RWMutex) error { +func (p *DemoParser) Setup(db *ent.Client, lock *sync.RWMutex, worker int) error { p.demoQueue = make(chan *Demo, 1000) - go p.parseWorker(db, lock) + p.db = db + p.lock = lock + for i := 0; i < worker; i++ { + go p.parseWorker() + } return nil } @@ -65,14 +71,14 @@ func (p *DemoParser) downloadReplay(demo *Demo) (io.Reader, error) { } -func (p *DemoParser) getDBPlayer(db *ent.Client, lock *sync.RWMutex, demo *Demo, demoPlayer *common.Player) (*ent.Stats, error) { - lock.RLock() - tMatchPlayer, err := db.Stats.Query().WithMatches(func(q *ent.MatchQuery) { +func (p *DemoParser) getDBPlayer(demo *Demo, demoPlayer *common.Player) (*ent.Stats, error) { + p.lock.RLock() + tMatchPlayer, err := p.db.Stats.Query().WithMatches(func(q *ent.MatchQuery) { q.Where(match.MatchID(demo.MatchId)) }).WithPlayers(func(q *ent.PlayerQuery) { q.Where(player.Steamid(demoPlayer.SteamID64)) }).Only(context.Background()) - lock.RUnlock() + p.lock.RUnlock() if err != nil { return nil, err } @@ -95,7 +101,7 @@ func (p *DemoParser) getMatchPlayerBySteamID(stats []*ent.Stats, steamId uint64) return nil } -func (p *DemoParser) parseWorker(db *ent.Client, lock *sync.RWMutex) { +func (p *DemoParser) parseWorker() { for { select { case demo := <-p.demoQueue: @@ -104,9 +110,9 @@ func (p *DemoParser) parseWorker(db *ent.Client, lock *sync.RWMutex) { continue } - lock.RLock() - tMatch, err := db.Match.Query().Where(match.MatchID(demo.MatchId)).Only(context.Background()) - lock.RUnlock() + p.lock.RLock() + tMatch, err := p.db.Match.Query().Where(match.MatchID(demo.MatchId)).Only(context.Background()) + p.lock.RUnlock() if err != nil { log.Errorf("[DP] Unable to get match %d: %v", demo.MatchId, err) continue @@ -136,9 +142,9 @@ func (p *DemoParser) parseWorker(db *ent.Client, lock *sync.RWMutex) { } downloadTime := time.Now().Sub(startTime) - lock.RLock() + p.lock.RLock() tStats, err := tMatch.QueryStats().WithPlayers().All(context.Background()) - lock.RUnlock() + p.lock.RUnlock() if err != nil { log.Errorf("[DP] Failed to find players for match %d: %v", demo.MatchId, err) continue @@ -292,18 +298,18 @@ func (p *DemoParser) parseWorker(db *ent.Client, lock *sync.RWMutex) { continue } - lock.Lock() + p.lock.Lock() err = tMatch.Update().SetMap(demoParser.Header().MapName).SetDemoParsed(true).Exec(context.Background()) - lock.Unlock() + p.lock.Unlock() if err != nil { log.Errorf("[DP] Unable to update match %d in database: %v", demo.MatchId, err) continue } for _, tMatchPlayer := range tStats { - lock.Lock() + p.lock.Lock() err := tMatchPlayer.Update().SetExtended(tMatchPlayer.Extended).Exec(context.Background()) - lock.Unlock() + p.lock.Unlock() if err != nil { log.Errorf("[DP] Unable to update player %d in database: %v", tMatchPlayer.Edges.Players.Steamid, err) continue diff --git a/main.go b/main.go index 58b8288..c4c6e66 100644 --- a/main.go +++ b/main.go @@ -389,7 +389,7 @@ func main() { log.Info("GC ready, starting HTTP server") sendGC = make(chan *csgo.Demo, 100) - utils.Check(demoParser.Setup(db.Client, db.Lock)) + utils.Check(demoParser.Setup(db.Client, db.Lock, conf.Parser.Worker)) go utils.GCInfoParser(sendGC, demoLoader, demoParser, db, conf.Steam.APIKey, rL) go housekeeping() diff --git a/utils/utils.go b/utils/utils.go index 9e1d3d1..724686c 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -26,6 +26,9 @@ type Conf struct { Logging struct { Level string } + Parser struct { + Worker int + } Steam struct { Username string APIKey string `yaml:"api_key"` @@ -151,7 +154,7 @@ func getNextShareCode(lastCode string, apiKey string, authCode string, steamId u if r.StatusCode == 202 { return "n/a", nil } else if r.StatusCode != 200 { - return "", fmt.Errorf("bad response from steam api (HTTP%d)", r.StatusCode) + return "", fmt.Errorf("bad response from steam api (HTTP %d)", r.StatusCode) } defer r.Body.Close()