diff --git a/csgo/demo_loader.go b/csgo/demo_loader.go index 0d9d225..32cb5fe 100644 --- a/csgo/demo_loader.go +++ b/csgo/demo_loader.go @@ -51,8 +51,8 @@ type DemoMatchLoader struct { db *ent.Client dp *DemoParser parseDemo chan *Demo - ParseMap map[string]bool - ParseMapL *sync.Mutex + parseMap map[string]bool + parseMapL *sync.RWMutex cache *cache.Cache connecting bool } @@ -75,7 +75,28 @@ func playerStatsFromRound(round *protobuf.CMsgGCCStrike15V2_MatchmakingServerRou return 0, 0, 0, 0, 0, 0 } -func (d *DemoMatchLoader) HandleGCPacket(pkg *gamecoordinator.GCPacket) { +func (dml *DemoMatchLoader) IsLoading(demo *Demo) bool { + dml.parseMapL.RLock() + defer dml.parseMapL.RUnlock() + if _, ok := dml.parseMap[demo.ShareCode]; ok { + return true + } + return false +} + +func (dml *DemoMatchLoader) unlockDemo(demo *Demo) { + dml.parseMapL.Lock() + defer dml.parseMapL.Unlock() + delete(dml.parseMap, demo.ShareCode) +} + +func (dml *DemoMatchLoader) lockDemo(demo *Demo) { + dml.parseMapL.Lock() + defer dml.parseMapL.Unlock() + dml.parseMap[demo.ShareCode] = true +} + +func (dml *DemoMatchLoader) HandleGCPacket(pkg *gamecoordinator.GCPacket) { switch pkg.MsgType { case uint32(protobuf.EGCBaseClientMsg_k_EMsgGCClientWelcome): msg := &protobuf.CMsgClientWelcome{} @@ -84,7 +105,7 @@ func (d *DemoMatchLoader) HandleGCPacket(pkg *gamecoordinator.GCPacket) { log.Errorf("[DL] Unable to unmarshal event %v: %v", pkg.MsgType, err) } log.Debugf("[GC] Welcome: %+v", msg) - d.GCReady = true + dml.GCReady = true case uint32(protobuf.EGCBaseClientMsg_k_EMsgGCClientConnectionStatus): msg := &protobuf.CMsgConnectionStatus{} err := proto.Unmarshal(pkg.Body, msg) @@ -94,8 +115,8 @@ func (d *DemoMatchLoader) HandleGCPacket(pkg *gamecoordinator.GCPacket) { log.Debugf("[GC] Status: %+v", msg) if msg.GetStatus() != protobuf.GCConnectionStatus_GCConnectionStatus_HAVE_SESSION { - d.GCReady = false - go d.greetGC() + dml.GCReady = false + go dml.greetGC() } case uint32(protobuf.ECsgoGCMsg_k_EMsgGCCStrike15_v2_GC2ClientGlobalStats): msg := &protobuf.GlobalStatistics{} @@ -104,21 +125,21 @@ func (d *DemoMatchLoader) HandleGCPacket(pkg *gamecoordinator.GCPacket) { log.Errorf("[DL] Unable to unmarshal event %v: %v", pkg.MsgType, err) } log.Debugf("[GC] Stats: %+v", msg) - d.GCReady = true + dml.GCReady = true case uint32(protobuf.ECsgoGCMsg_k_EMsgGCCStrike15_v2_MatchList): msg := &protobuf.CMsgGCCStrike15V2_MatchList{} err := proto.Unmarshal(pkg.Body, msg) if err != nil { log.Errorf("[DL] Unable to unmarshal event %v: %v", pkg.MsgType, err) } - d.matchRecv <- msg + dml.matchRecv <- msg default: log.Debugf("[GC] Unhandled GC message: %+v", pkg) } } -func (d *DemoMatchLoader) getMatchDetails(sharecode string) (*protobuf.CMsgGCCStrike15V2_MatchList, error) { - if !d.GCReady { +func (dml *DemoMatchLoader) getMatchDetails(sharecode string) (*protobuf.CMsgGCCStrike15V2_MatchList, error) { + if !dml.GCReady { return nil, fmt.Errorf("gc not ready") } @@ -126,142 +147,143 @@ func (d *DemoMatchLoader) getMatchDetails(sharecode string) (*protobuf.CMsgGCCSt if err != nil { return nil, err } - err = d.requestDemoInfo(matchId, outcomeId, uint32(tokenId)) + err = dml.requestDemoInfo(matchId, outcomeId, uint32(tokenId)) if err != nil { return nil, err } for { select { - case matchDetails := <-d.matchRecv: + case matchDetails := <-dml.matchRecv: if *matchDetails.Matches[0].Matchid == matchId { return matchDetails, nil } else { - d.matchRecv <- matchDetails + dml.matchRecv <- matchDetails } } } } -func (d *DemoMatchLoader) connectToSteam() error { - if d.client.Connected() { +func (dml *DemoMatchLoader) connectToSteam() error { + if dml.client.Connected() { return nil } - _, err := d.client.Connect() + _, err := dml.client.Connect() if err != nil { return err } return nil } -func (d *DemoMatchLoader) Setup(config *DemoMatchLoaderConfig) error { - d.loginKey = config.LoginKey - d.sentryFile = config.Sentry - d.db = config.Db - d.dp = &DemoParser{} - d.ParseMap = map[string]bool{} - d.ParseMapL = new(sync.Mutex) - d.cache = config.Cache - err := d.dp.Setup(config.Db, config.Worker, config.SprayTimeout) +func (dml *DemoMatchLoader) Setup(config *DemoMatchLoaderConfig) error { + dml.loginKey = config.LoginKey + dml.sentryFile = config.Sentry + dml.db = config.Db + dml.dp = &DemoParser{} + dml.parseMap = map[string]bool{} + dml.parseMapL = new(sync.RWMutex) + dml.cache = config.Cache + err := dml.dp.Setup(config.Db, config.Worker, config.SprayTimeout) if err != nil { return err } - d.steamLogin = new(steam.LogOnDetails) - d.steamLogin.Username = config.Username - d.steamLogin.Password = config.Password - d.steamLogin.AuthCode = config.AuthCode - d.steamLogin.ShouldRememberPassword = true + dml.steamLogin = new(steam.LogOnDetails) + dml.steamLogin.Username = config.Username + dml.steamLogin.Password = config.Password + dml.steamLogin.AuthCode = config.AuthCode + dml.steamLogin.ShouldRememberPassword = true - if _, err := os.Stat(d.sentryFile); err == nil { - hash, err := ioutil.ReadFile(d.sentryFile) + if _, err := os.Stat(dml.sentryFile); err == nil { + hash, err := ioutil.ReadFile(dml.sentryFile) if err != nil { return err } - d.steamLogin.SentryFileHash = hash + dml.steamLogin.SentryFileHash = hash } - if _, err := os.Stat(d.loginKey); err == nil { - hash, err := ioutil.ReadFile(d.loginKey) + if _, err := os.Stat(dml.loginKey); err == nil { + hash, err := ioutil.ReadFile(dml.loginKey) if err != nil { return err } - d.steamLogin.LoginKey = string(hash) + dml.steamLogin.LoginKey = string(hash) } - d.client = steam.NewClient() + dml.client = steam.NewClient() err = steam.InitializeSteamDirectory() if err != nil { return err } - d.matchRecv = make(chan *protobuf.CMsgGCCStrike15V2_MatchList, 1000) - d.parseDemo = make(chan *Demo, 1000) + dml.matchRecv = make(chan *protobuf.CMsgGCCStrike15V2_MatchList, 1000) + dml.parseDemo = make(chan *Demo, 1000) - go d.connectLoop() - go d.steamEventHandler() + go dml.connectLoop() + go dml.steamEventHandler() + go dml.demoWorker() for i := 0; i < config.Worker; i++ { - go d.gcWorker(config.ApiKey, config.RateLimit) + go dml.gcWorker(config.ApiKey, config.RateLimit) } return nil } -func (d DemoMatchLoader) LoadDemo(demo *Demo) error { +func (dml DemoMatchLoader) LoadDemo(demo *Demo) error { select { - case d.parseDemo <- demo: + case dml.parseDemo <- demo: return nil default: return fmt.Errorf("queue full") } } -func (d DemoMatchLoader) connectLoop() { - if !d.connecting { - d.connecting = true - for d.connectToSteam() != nil { +func (dml DemoMatchLoader) connectLoop() { + if !dml.connecting { + dml.connecting = true + for dml.connectToSteam() != nil { log.Infof("[DL] Retrying connecting to steam...") time.Sleep(time.Minute * 10) } } } -func (d *DemoMatchLoader) steamEventHandler() { - for event := range d.client.Events() { +func (dml *DemoMatchLoader) steamEventHandler() { + for event := range dml.client.Events() { switch e := event.(type) { case *steam.ConnectedEvent: log.Debug("[DL] Connected!") - d.client.Auth.LogOn(d.steamLogin) + dml.client.Auth.LogOn(dml.steamLogin) case *steam.MachineAuthUpdateEvent: log.Debug("[DL] Got sentry!") - err := ioutil.WriteFile(d.sentryFile, e.Hash, os.ModePerm) + err := ioutil.WriteFile(dml.sentryFile, e.Hash, os.ModePerm) if err != nil { log.Errorf("[DL] Unable write sentry file: %v", err) } case *steam.LoggedOnEvent: log.Debug("[DL] Login successfully!") - d.client.Social.SetPersonaState(steamlang.EPersonaState_Online) - go d.setPlaying() + dml.client.Social.SetPersonaState(steamlang.EPersonaState_Online) + go dml.setPlaying() case *steam.LogOnFailedEvent: log.Warningf("[DL] Steam login denied: %+v", e) switch e.Result { case steamlang.EResult_AccountLogonDenied: log.Fatalf("[DL] Please provide AuthCode in config") case steamlang.EResult_InvalidPassword: - _ = os.Remove(d.sentryFile) - _ = os.Remove(d.loginKey) + _ = os.Remove(dml.sentryFile) + _ = os.Remove(dml.loginKey) log.Warningf("[DL] Steam login wrong") - go d.connectLoop() + go dml.connectLoop() case steamlang.EResult_InvalidLoginAuthCode: log.Fatalf("[DL] Steam auth code wrong") } case *steam.DisconnectedEvent: log.Warningf("Steam disconnected, trying to reconnect...") - go d.connectLoop() + go dml.connectLoop() case *steam.LoginKeyEvent: log.Debug("Got login_key!") - err := ioutil.WriteFile(d.loginKey, []byte(e.LoginKey), os.ModePerm) + err := ioutil.WriteFile(dml.loginKey, []byte(e.LoginKey), os.ModePerm) if err != nil { log.Errorf("[DL] Unable write login_key: %v", err) } @@ -275,23 +297,23 @@ func (d *DemoMatchLoader) steamEventHandler() { } } -func (d *DemoMatchLoader) setPlaying() { - d.client.GC.SetGamesPlayed(APPID) - d.client.GC.RegisterPacketHandler(d) - go d.greetGC() +func (dml *DemoMatchLoader) setPlaying() { + dml.client.GC.SetGamesPlayed(APPID) + dml.client.GC.RegisterPacketHandler(dml) + go dml.greetGC() } -func (d *DemoMatchLoader) greetGC() { - for !d.GCReady { +func (dml *DemoMatchLoader) greetGC() { + for !dml.GCReady { log.Debugf("[DL] Sending GC greeting") msg := protobuf.CMsgClientHello{} - d.client.GC.Write(gamecoordinator.NewGCMsgProtobuf(APPID, uint32(protobuf.EGCBaseClientMsg_k_EMsgGCClientHello), &msg)) + dml.client.GC.Write(gamecoordinator.NewGCMsgProtobuf(APPID, uint32(protobuf.EGCBaseClientMsg_k_EMsgGCClientHello), &msg)) time.Sleep(500 * time.Millisecond) } } -func (d *DemoMatchLoader) requestDemoInfo(matchId uint64, conclusionId uint64, tokenId uint32) error { - if !d.GCReady { +func (dml *DemoMatchLoader) requestDemoInfo(matchId uint64, conclusionId uint64, tokenId uint32) error { + if !dml.GCReady { return fmt.Errorf("gc not ready") } @@ -299,52 +321,48 @@ func (d *DemoMatchLoader) requestDemoInfo(matchId uint64, conclusionId uint64, t Outcomeid: &conclusionId, Token: &tokenId} - d.client.GC.Write(gamecoordinator.NewGCMsgProtobuf(APPID, uint32(protobuf.ECsgoGCMsg_k_EMsgGCCStrike15_v2_MatchListRequestFullGameInfo), &msg)) + dml.client.GC.Write(gamecoordinator.NewGCMsgProtobuf(APPID, uint32(protobuf.ECsgoGCMsg_k_EMsgGCCStrike15_v2_MatchListRequestFullGameInfo), &msg)) return nil } -func (d *DemoMatchLoader) gcWorker(apiKey string, rl ratelimit.Limiter) { - for demo := range d.parseDemo { - d.ParseMapL.Lock() - if _, ok := d.ParseMap[demo.ShareCode]; ok { - log.Infof("[DL] Skipping %s: parsing in progress", demo.ShareCode) - d.ParseMapL.Unlock() - continue - } else { - d.ParseMap[demo.ShareCode] = true - } - d.ParseMapL.Unlock() +func (dml *DemoMatchLoader) demoWorker() { + for demo := range dml.dp.Done { + dml.unlockDemo(demo) + } +} - if !d.GCReady { +func (dml *DemoMatchLoader) gcWorker(apiKey string, rl ratelimit.Limiter) { + for demo := range dml.parseDemo { + if !dml.IsLoading(demo) { + log.Infof("[DL] Skipping %s: parsing in progress", demo.ShareCode) + continue + } + dml.lockDemo(demo) + + if !dml.GCReady { log.Infof("[DL] Postponing match %d (%s): GC not ready", demo.MatchId, demo.ShareCode) time.Sleep(5 * time.Second) - d.ParseMapL.Lock() - delete(d.ParseMap, demo.ShareCode) - d.ParseMapL.Unlock() - d.parseDemo <- demo + dml.unlockDemo(demo) + dml.parseDemo <- demo continue } matchId, _, _, err := DecodeSharecode(demo.ShareCode) if err != nil || matchId == 0 { log.Warningf("[DL] Can't parse match with sharecode %s: %v", demo.ShareCode, err) - d.ParseMapL.Lock() - delete(d.ParseMap, demo.ShareCode) - d.ParseMapL.Unlock() + dml.unlockDemo(demo) continue } - iMatch, err := d.db.Match.Get(context.Background(), matchId) + iMatch, err := dml.db.Match.Get(context.Background(), matchId) if err != nil { switch e := err.(type) { case *ent.NotFoundError: break default: log.Errorf("[DL] Failure trying to lookup match %d in db: %v", matchId, e) - d.ParseMapL.Lock() - delete(d.ParseMap, demo.ShareCode) - d.ParseMapL.Unlock() + dml.unlockDemo(demo) continue } } else { @@ -353,32 +371,26 @@ func (d *DemoMatchLoader) gcWorker(apiKey string, rl ratelimit.Limiter) { demo.MatchId = matchId demo.Url = iMatch.ReplayURL demo.DecryptionKey = iMatch.DecryptionKey - err := d.dp.ParseDemo(demo) + err := dml.dp.ParseDemo(demo) if err != nil { log.Warningf("[DL] Parsing demo from match %d failed: %v", demo.MatchId, err) + dml.unlockDemo(demo) } - d.ParseMapL.Lock() - delete(d.ParseMap, demo.ShareCode) - d.ParseMapL.Unlock() continue } log.Infof("[DL] Skipped match %d: already loaded", matchId) - d.ParseMapL.Lock() - delete(d.ParseMap, demo.ShareCode) - d.ParseMapL.Unlock() + dml.unlockDemo(demo) continue } log.Infof("[DL] Requesting match %d from GC", matchId) t := time.Now() - matchDetails, err := d.getMatchDetails(demo.ShareCode) + matchDetails, err := dml.getMatchDetails(demo.ShareCode) if err != nil { log.Warningf("[DL] Failure to get match-details for %d from GC: %v", demo.MatchId, err) - d.ParseMapL.Lock() - delete(d.ParseMap, demo.ShareCode) - d.ParseMapL.Unlock() + dml.unlockDemo(demo) continue } @@ -389,7 +401,7 @@ func (d *DemoMatchLoader) gcWorker(apiKey string, rl ratelimit.Limiter) { var players []*ent.Player for _, accountId := range lastRound.GetReservation().GetAccountIds() { - tPlayer, err := utils.Player(d.db, AccountId2SteamId(accountId), apiKey, rl) + tPlayer, err := utils.Player(dml.db, AccountId2SteamId(accountId), apiKey, rl) if err != nil { log.Warningf("[DL] Unable to get player for steamid %d: %v", AccountId2SteamId(accountId), err) continue @@ -401,7 +413,7 @@ func (d *DemoMatchLoader) gcWorker(apiKey string, rl ratelimit.Limiter) { demo.MatchId = matchZero.GetMatchid() demo.DecryptionKey = []byte(strings.ToUpper(strconv.FormatUint(matchZero.GetWatchablematchinfo().GetClDecryptdataKeyPub(), 16))) - tMatch, err := d.db.Match.Create(). + tMatch, err := dml.db.Match.Create(). SetID(matchZero.GetMatchid()). AddPlayers(players...). SetDate(time.Unix(int64(matchZero.GetMatchtime()), 0).UTC()). @@ -416,9 +428,7 @@ func (d *DemoMatchLoader) gcWorker(apiKey string, rl ratelimit.Limiter) { Save(context.Background()) if err != nil { log.Warningf("[DL] Unable to create match %d: %v", matchZero.GetMatchid(), err) - d.ParseMapL.Lock() - delete(d.ParseMap, demo.ShareCode) - d.ParseMapL.Unlock() + dml.unlockDemo(demo) continue } @@ -457,7 +467,7 @@ func (d *DemoMatchLoader) gcWorker(apiKey string, rl ratelimit.Limiter) { } kills, deaths, assists, hs, score, mvp := playerStatsFromRound(lastRound, mPlayer) - err := d.db.MatchPlayer.Create(). + err := dml.db.MatchPlayer.Create(). SetMatches(tMatch). SetPlayers(mPlayer). SetTeamID(teamId). @@ -479,7 +489,7 @@ func (d *DemoMatchLoader) gcWorker(apiKey string, rl ratelimit.Limiter) { // clear cache or regen values player for _, p := range players { - err = d.cache.Delete(context.Background(), fmt.Sprintf(utils.SideMetaCacheKey, p.ID)) + err = dml.cache.Delete(context.Background(), fmt.Sprintf(utils.SideMetaCacheKey, p.ID)) if err != nil { log.Warningf("[DL] Unable to delete cache key %s: %v", fmt.Sprintf(utils.SideMetaCacheKey, p.ID), err) } @@ -495,12 +505,10 @@ func (d *DemoMatchLoader) gcWorker(apiKey string, rl ratelimit.Limiter) { } } - err = d.dp.ParseDemo(demo) + err = dml.dp.ParseDemo(demo) if err != nil { log.Warningf("[DL] Can't queue demo %d for parsing: %v", demo.MatchId, err) + dml.unlockDemo(demo) } - d.ParseMapL.Lock() - delete(d.ParseMap, demo.ShareCode) - d.ParseMapL.Unlock() } } diff --git a/csgo/demo_parser.go b/csgo/demo_parser.go index 082c696..f72afd0 100644 --- a/csgo/demo_parser.go +++ b/csgo/demo_parser.go @@ -32,6 +32,7 @@ type DemoParser struct { tempDir string db *ent.Client sprayTimeout int + Done chan *Demo } type Encounter struct { @@ -105,19 +106,20 @@ func (s *Sprays) Avg() (avg [][]float32) { return } -func (p *DemoParser) Setup(db *ent.Client, worker int, sprayTimeout int) error { - p.demoQueue = make(chan *Demo, 1000) - p.db = db - p.sprayTimeout = sprayTimeout +func (dp *DemoParser) Setup(db *ent.Client, worker int, sprayTimeout int) error { + dp.demoQueue = make(chan *Demo, 1000) + dp.db = db + dp.sprayTimeout = sprayTimeout + dp.Done = make(chan *Demo, worker) for i := 0; i < worker; i++ { - go p.parseWorker() + go dp.parseWorker() } return nil } -func (p *DemoParser) ParseDemo(demo *Demo) error { +func (dp *DemoParser) ParseDemo(demo *Demo) error { select { - case p.demoQueue <- demo: + case dp.demoQueue <- demo: return nil default: return fmt.Errorf("queue full") @@ -138,8 +140,8 @@ func (d *Demo) download() (io.Reader, error) { return bzip2.NewReader(r.Body), nil } -func (p *DemoParser) getDBPlayer(demo *Demo, demoPlayer *common.Player) (*ent.MatchPlayer, error) { - tMatchPlayer, err := p.db.MatchPlayer.Query().WithMatches(func(q *ent.MatchQuery) { +func (dp *DemoParser) getDBPlayer(demo *Demo, demoPlayer *common.Player) (*ent.MatchPlayer, error) { + tMatchPlayer, err := dp.db.MatchPlayer.Query().WithMatches(func(q *ent.MatchQuery) { q.Where(match.ID(demo.MatchId)) }).WithPlayers(func(q *ent.PlayerQuery) { q.Where(player.ID(demoPlayer.SteamID64)) @@ -151,7 +153,7 @@ func (p *DemoParser) getDBPlayer(demo *Demo, demoPlayer *common.Player) (*ent.Ma return tMatchPlayer, nil } -func (p *DemoParser) MatchPlayerBySteamID(stats []*ent.MatchPlayer, steamId uint64) (*ent.MatchPlayer, error) { +func (dp *DemoParser) MatchPlayerBySteamID(stats []*ent.MatchPlayer, steamId uint64) (*ent.MatchPlayer, error) { for _, tStats := range stats { tPLayer, err := tStats.Edges.PlayersOrErr() if err != nil { @@ -190,21 +192,24 @@ func setMatchPlayerColor(matchPlayer *ent.MatchPlayer, demoPlayer *common.Player } } -func (p *DemoParser) parseWorker() { - for demo := range p.demoQueue { +func (dp *DemoParser) parseWorker() { + for demo := range dp.demoQueue { if demo.MatchId == 0 { log.Warningf("[DP] can't parse match %s: no matchid found", demo.ShareCode) + dp.Done <- demo continue } - tMatch, err := p.db.Match.Get(context.Background(), demo.MatchId) + tMatch, err := dp.db.Match.Get(context.Background(), demo.MatchId) if err != nil { log.Errorf("[DP] Unable to get match %d: %v", demo.MatchId, err) + dp.Done <- demo continue } if tMatch.DemoParsed { log.Infof("[DP] skipped already parsed %d", demo.MatchId) + dp.Done <- demo continue } @@ -217,9 +222,11 @@ func (p *DemoParser) parseWorker() { } else { log.Infof("[DP] demo 404 not found for match %d. Trying again later.", demo.MatchId) } + dp.Done <- demo continue } else { log.Errorf("[DP] Unable to download demo for %d: %v", demo.MatchId, err) + dp.Done <- demo continue } } @@ -228,6 +235,7 @@ func (p *DemoParser) parseWorker() { tStats, err := tMatch.QueryStats().WithPlayers().All(context.Background()) if err != nil { log.Errorf("[DP] Failed to find players for match %d: %v", demo.MatchId, err) + dp.Done <- demo continue } @@ -255,7 +263,7 @@ func (p *DemoParser) parseWorker() { // onChatMessage demoParser.RegisterEventHandler(func(e events.ChatMessage) { gs := demoParser.GameState() - tAttacker, err := p.MatchPlayerBySteamID(tStats, e.Sender.SteamID64) + tAttacker, err := dp.MatchPlayerBySteamID(tStats, e.Sender.SteamID64) if err != nil { log.Warningf("[DP] Unable to get player for id %d: %v", e.Sender.SteamID64, err) return @@ -292,7 +300,7 @@ func (p *DemoParser) parseWorker() { for _, spray := range spays { if e.Shooter.SteamID64 == spray.Sprayer && int(e.Weapon.Type) == spray.Weapon { playerWeaponFound = true - spray.Add(demoParser.CurrentTime(), []float32{e.Shooter.ViewDirectionX(), e.Shooter.ViewDirectionY()}, p.sprayTimeout) + spray.Add(demoParser.CurrentTime(), []float32{e.Shooter.ViewDirectionX(), e.Shooter.ViewDirectionY()}, dp.sprayTimeout) } } @@ -311,7 +319,7 @@ func (p *DemoParser) parseWorker() { return } - tAttacker, err := p.MatchPlayerBySteamID(tStats, e.Attacker.SteamID64) + tAttacker, err := dp.MatchPlayerBySteamID(tStats, e.Attacker.SteamID64) if err != nil { log.Warningf("[DP] Unable to get player for id %d: %v", e.Attacker.SteamID64, err) return @@ -373,7 +381,7 @@ func (p *DemoParser) parseWorker() { return } - tAttacker, err := p.MatchPlayerBySteamID(tStats, e.Attacker.SteamID64) + tAttacker, err := dp.MatchPlayerBySteamID(tStats, e.Attacker.SteamID64) if err != nil { log.Warningf("[DP] Unable to get player for id %d: %v", e.Attacker.SteamID64, err) return @@ -401,7 +409,7 @@ func (p *DemoParser) parseWorker() { // onPlayerConnected demoParser.RegisterEventHandler(func(e events.PlayerTeamChange) { if e.Player != nil && e.Player.SteamID64 != 0 { - tMatchPlayer, err := p.MatchPlayerBySteamID(tStats, e.Player.SteamID64) + tMatchPlayer, err := dp.MatchPlayerBySteamID(tStats, e.Player.SteamID64) if err != nil { log.Warningf("[DP] Unable to get player for id %d: %v", e.Player.SteamID64, err) return @@ -416,7 +424,7 @@ func (p *DemoParser) parseWorker() { for _, demoPlayer := range gs.Participants().Playing() { if demoPlayer != nil && demoPlayer.SteamID64 != 0 { - tMatchPlayer, err := p.MatchPlayerBySteamID(tStats, demoPlayer.SteamID64) + tMatchPlayer, err := dp.MatchPlayerBySteamID(tStats, demoPlayer.SteamID64) if err != nil { log.Warningf("[DP] Unable to get player for id %d: %v", demoPlayer.SteamID64, err) return @@ -429,7 +437,7 @@ func (p *DemoParser) parseWorker() { // onRankUpdate demoParser.RegisterEventHandler(func(e events.RankUpdate) { if e.SteamID64() != 0 { - tMatchPlayer, err := p.MatchPlayerBySteamID(tStats, e.SteamID64()) + tMatchPlayer, err := dp.MatchPlayerBySteamID(tStats, e.SteamID64()) if err != nil { log.Warningf("[DP] Unable to get player for id %d: %v", e.SteamID64(), err) return @@ -443,6 +451,7 @@ func (p *DemoParser) parseWorker() { err = demoParser.ParseToEnd() if err != nil { log.Errorf("[DP] Error parsing replay: %v", err) + dp.Done <- demo continue } @@ -453,6 +462,7 @@ func (p *DemoParser) parseWorker() { Exec(context.Background()) if err != nil { log.Errorf("[DP] Unable to update match %d in database: %v", demo.MatchId, err) + dp.Done <- demo continue } @@ -487,14 +497,14 @@ func (p *DemoParser) parseWorker() { } for _, eqDmg := range eqMap[tMatchPlayer.PlayerStats] { - err := p.db.Weapon.Create().SetStat(nMatchPLayer).SetDmg(eqDmg.Dmg).SetVictim(eqDmg.To).SetHitGroup(eqDmg.HitGroup).SetEqType(eqDmg.Eq).Exec(context.Background()) + err := dp.db.Weapon.Create().SetStat(nMatchPLayer).SetDmg(eqDmg.Dmg).SetVictim(eqDmg.To).SetHitGroup(eqDmg.HitGroup).SetEqType(eqDmg.Eq).Exec(context.Background()) if err != nil { log.Errorf("[DP] Unable to create WeaponStat: %v", err) } } for _, eco := range ecoMap[tMatchPlayer.PlayerStats] { - err := p.db.RoundStats.Create().SetMatchPlayer(nMatchPLayer).SetRound(uint(eco.Round)).SetBank(uint(eco.Bank)).SetEquipment(uint(eco.EqV)).SetSpent(uint(eco.Spent)).Exec(context.Background()) + err := dp.db.RoundStats.Create().SetMatchPlayer(nMatchPLayer).SetRound(uint(eco.Round)).SetBank(uint(eco.Bank)).SetEquipment(uint(eco.EqV)).SetSpent(uint(eco.Spent)).Exec(context.Background()) if err != nil { log.Errorf("[DP] Unable to create RoundStat: %v", err) } @@ -511,7 +521,7 @@ func (p *DemoParser) parseWorker() { continue } - err = p.db.Spray.Create().SetMatchPlayers(nMatchPLayer).SetWeapon(spray.Weapon).SetSpray(sprayBuf.Bytes()).Exec(context.Background()) + err = dp.db.Spray.Create().SetMatchPlayers(nMatchPLayer).SetWeapon(spray.Weapon).SetSpray(sprayBuf.Bytes()).Exec(context.Background()) if err != nil { log.Warningf("[DP] Failure adding spray to database: %v", err) } @@ -521,10 +531,10 @@ func (p *DemoParser) parseWorker() { bulk := make([]*ent.MessagesCreate, 0) for _, msg := range tMatchPlayer.Edges.Messages { - bulk = append(bulk, p.db.Messages.Create().SetMessage(msg.Message).SetAllChat(msg.AllChat).SetTick(msg.Tick).SetMatchPlayer(tMatchPlayer)) + bulk = append(bulk, dp.db.Messages.Create().SetMessage(msg.Message).SetAllChat(msg.AllChat).SetTick(msg.Tick).SetMatchPlayer(tMatchPlayer)) } if len(bulk) > 0 { - err = p.db.Messages.CreateBulk(bulk...).Exec(context.Background()) + err = dp.db.Messages.CreateBulk(bulk...).Exec(context.Background()) if err != nil { log.Warningf("[DP] Failure adding messages to database: %v", err) } @@ -537,5 +547,6 @@ func (p *DemoParser) parseWorker() { if err != nil { log.Errorf("[DP] Unable close demo file for match %d: %v", demo.MatchId, err) } + dp.Done <- demo } } diff --git a/main.go b/main.go index c7104ac..3e77256 100644 --- a/main.go +++ b/main.go @@ -174,18 +174,16 @@ func housekeeping() { } for _, m := range tMatches { - demoLoader.ParseMapL.Lock() - if _, ok := demoLoader.ParseMap[m.ShareCode]; ok { + demo := &csgo.Demo{MatchId: m.ID, ShareCode: m.ShareCode} + if demoLoader.IsLoading(demo) { log.Infof("[HK] Skipping %s: parsing in progress", m.ShareCode) - demoLoader.ParseMapL.Unlock() continue } log.Infof("[HK] Try reparsing match %d, played on %s", m.ID, m.Date) - err := demoLoader.LoadDemo(&csgo.Demo{MatchId: m.ID, ShareCode: m.ShareCode}) + err := demoLoader.LoadDemo(demo) if err != nil { log.Warningf("[HK] Failure trying to parse match %d: %v", m.ID, err) } - demoLoader.ParseMapL.Unlock() } // check for inconsistent matches