diff --git a/csgo/demo_loader.go b/csgo/demo_loader.go index 01edd35..5665afc 100644 --- a/csgo/demo_loader.go +++ b/csgo/demo_loader.go @@ -56,6 +56,8 @@ type DemoMatchLoader struct { parseDemo chan *Demo parseMap map[string]bool parseMapL *sync.RWMutex + parsePlayerMap map[uint32]*sync.RWMutex + parsePlayerMapL *sync.RWMutex cache *cache.Cache connectionWait uint64 connectFeedback chan int @@ -344,7 +346,7 @@ func (dml *DemoMatchLoader) greetGC() { log.Debugf("[DL] Sending GC greeting") msg := protobuf.CMsgClientHello{} dml.client.GC.Write(gamecoordinator.NewGCMsgProtobuf(APPID, uint32(protobuf.EGCBaseClientMsg_k_EMsgGCClientHello), &msg)) - time.Sleep(500 * time.Millisecond) + time.Sleep(1 * time.Second) } } @@ -368,178 +370,190 @@ func (dml *DemoMatchLoader) demoWorker() { } } +func (dml *DemoMatchLoader) handleDemo(demo *Demo, apiKey string, rl ratelimit.Limiter) error { + if dml.IsLoading(demo) { + log.Infof("[DL] Skipping %s: parsing in progress", demo.ShareCode) + return nil + } + dml.lockDemo(demo) + defer dml.unlockDemo(demo) + + if !dml.GCReady { + log.Infof("[DL] Postponing match %d (%s): GC not ready", demo.MatchId, demo.ShareCode) + time.Sleep(5 * time.Second) + dml.parseDemo <- demo + return nil + } + + matchId, _, _, err := DecodeSharecode(demo.ShareCode) + if err != nil || matchId == 0 { + return fmt.Errorf("error decoding sharecode %s: %w", demo.ShareCode, err) + } + + iMatch, err := dml.db.Match.Get(context.Background(), matchId) + if err != nil && !ent.IsNotFound(err) { + return fmt.Errorf("error looking up match: %w", err) + } else if err == nil { + if iMatch.DemoParsed == false && iMatch.Date.After(time.Now().UTC().AddDate(0, 0, -30)) { + log.Infof("[DL] Match %d is loaded, but not parsed. Try parsing.", iMatch.ID) + demo.MatchId = matchId + demo.Url = iMatch.ReplayURL + demo.DecryptionKey = iMatch.DecryptionKey + err := dml.dp.ParseDemo(demo) + if err != nil { + return fmt.Errorf("error parsing match %d: %w", demo.MatchId, err) + } + return nil + } + + log.Infof("[DL] Skipped match %d: already loaded", matchId) + return nil + } + + log.Infof("[DL] Requesting match %d from GC", matchId) + t := time.Now() + + matchDetails, err := dml.getMatchDetails(demo.ShareCode) + if err != nil { + return fmt.Errorf("error getting match-details for %d: %w", demo.MatchId, err) + } + + log.Infof("[DL] Recieved matchdetails for match %d (%s)", matchId, time.Since(t)) + + matchZero := matchDetails.GetMatches()[0] + lastRound := matchZero.GetRoundstatsall()[len(matchZero.Roundstatsall)-1] + var players []*ent.Player + + dml.parsePlayerMapL.Lock() + for _, accID := range lastRound.GetReservation().GetAccountIds() { + if dml.parsePlayerMap[accID] == nil { + dml.parsePlayerMap[accID] = new(sync.RWMutex) + } + } + dml.parsePlayerMapL.Unlock() + + dml.parsePlayerMapL.RLock() + for _, accountId := range lastRound.GetReservation().GetAccountIds() { + dml.parsePlayerMap[accountId].Lock() + tPlayer, err := utils.Player(dml.db, AccountId2SteamId(accountId), apiKey, rl) + if err != nil { + dml.parsePlayerMap[accountId].Unlock() + dml.parsePlayerMapL.RUnlock() + return fmt.Errorf("error getting player for steamid %d: %w", AccountId2SteamId(accountId), err) + } + players = append(players, tPlayer) + } + dml.parsePlayerMapL.RUnlock() + defer func() { + dml.parsePlayerMapL.RLock() + for _, accountId := range lastRound.GetReservation().GetAccountIds() { + dml.parsePlayerMap[accountId].Unlock() + } + dml.parsePlayerMapL.RUnlock() + }() + + demo.Url = lastRound.GetMap() + demo.MatchId = matchZero.GetMatchid() + demo.DecryptionKey = []byte(strings.ToUpper(strconv.FormatUint(matchZero.GetWatchablematchinfo().GetClDecryptdataKeyPub(), 16))) + + tMatch, err := dml.db.Match.Create(). + SetID(matchZero.GetMatchid()). + AddPlayers(players...). + SetDate(time.Unix(int64(matchZero.GetMatchtime()), 0).UTC()). + SetMaxRounds(int(lastRound.GetMaxRounds())). + SetDuration(int(lastRound.GetMatchDuration())). + SetShareCode(demo.ShareCode). + SetReplayURL(lastRound.GetMap()). + SetScoreTeamA(int(lastRound.GetTeamScores()[0])). + SetScoreTeamB(int(lastRound.GetTeamScores()[1])). + SetMatchResult(int(lastRound.GetMatchResult())). + SetDecryptionKey(demo.DecryptionKey). + Save(context.Background()) + if err != nil { + return fmt.Errorf("error creating match %d: %w", matchZero.GetMatchid(), err) + } + + for i, mPlayer := range players { + var ( + teamId int + mk2, mk3, mk4, mk5 uint + ) + + if i > 4 { + teamId = 2 + } else { + teamId = 1 + } + + var oldKills int32 + for _, round := range matchZero.GetRoundstatsall() { + kills, _, _, _, _, _ := playerStatsFromRound(round, mPlayer) + + killDiff := kills - oldKills + + switch killDiff { + case 2: + mk2++ + case 3: + mk3++ + case 4: + mk4++ + case 5: + mk5++ + } + oldKills = kills + } + + kills, deaths, assists, hs, score, mvp := playerStatsFromRound(lastRound, mPlayer) + err := dml.db.MatchPlayer.Create(). + SetMatches(tMatch). + SetPlayers(mPlayer). + SetTeamID(teamId). + SetKills(int(kills)). + SetDeaths(int(deaths)). + SetAssists(int(assists)). + SetMvp(uint(mvp)). + SetScore(int(score)). + SetHeadshot(int(hs)). + SetMk2(mk2). + SetMk3(mk3). + SetMk4(mk4). + SetMk5(mk5). + Exec(context.Background()) + if err != nil { + return fmt.Errorf("error creating stats for player %d in match %d: %w", mPlayer.ID, tMatch.ID, err) + } + } + + // clear cache or regen values player + for _, p := range players { + err = dml.cache.Delete(context.Background(), fmt.Sprintf(utils.SideMetaCacheKey, p.ID)) + if err != nil { + return fmt.Errorf("error deleting cache key %s: %w", fmt.Sprintf(utils.SideMetaCacheKey, p.ID), err) + } + + w, l, t, err := utils.GetWinLossTieForPlayer(p) + if err != nil { + return fmt.Errorf("error calculating WinLossTie for player %d: %w", p.ID, err) + } + err = p.Update().SetWins(w).SetTies(t).SetLooses(l).Exec(context.Background()) + if err != nil { + return fmt.Errorf("error saving WinLossTie for player %d: %w", p.ID, err) + } + } + + err = dml.dp.ParseDemo(demo) + if err != nil { + return fmt.Errorf("error queueing demo %d for parsing: %w", demo.MatchId, err) + } + return nil +} + func (dml *DemoMatchLoader) gcWorker(apiKey string, rl ratelimit.Limiter) { for demo := range dml.parseDemo { - if dml.IsLoading(demo) { - log.Infof("[DL] Skipping %s: parsing in progress", demo.ShareCode) - continue - } - dml.lockDemo(demo) - - if !dml.GCReady { - log.Infof("[DL] Postponing match %d (%s): GC not ready", demo.MatchId, demo.ShareCode) - time.Sleep(5 * time.Second) - dml.unlockDemo(demo) - dml.parseDemo <- demo - continue - } - - matchId, _, _, err := DecodeSharecode(demo.ShareCode) - if err != nil || matchId == 0 { - log.Warningf("[DL] Can't parse match with sharecode %s: %v", demo.ShareCode, err) - dml.unlockDemo(demo) - continue - } - - iMatch, err := dml.db.Match.Get(context.Background(), matchId) - if err != nil && !ent.IsNotFound(err) { - log.Errorf("[DL] Failure trying to lookup match %d in db: %v", matchId, err) - dml.unlockDemo(demo) - continue - } else if err == nil { - if iMatch.DemoParsed == false && iMatch.Date.After(time.Now().UTC().AddDate(0, 0, -30)) { - log.Infof("[DL] Match %d is loaded, but not parsed. Try parsing.", iMatch.ID) - demo.MatchId = matchId - demo.Url = iMatch.ReplayURL - demo.DecryptionKey = iMatch.DecryptionKey - err := dml.dp.ParseDemo(demo) - if err != nil { - log.Warningf("[DL] Parsing demo from match %d failed: %v", demo.MatchId, err) - dml.unlockDemo(demo) - } - continue - } - - log.Infof("[DL] Skipped match %d: already loaded", matchId) - dml.unlockDemo(demo) - continue - } - - log.Infof("[DL] Requesting match %d from GC", matchId) - t := time.Now() - - matchDetails, err := dml.getMatchDetails(demo.ShareCode) + err := dml.handleDemo(demo, apiKey, rl) if err != nil { - log.Warningf("[DL] Failure to get match-details for %d from GC: %v", demo.MatchId, err) - dml.unlockDemo(demo) - continue - } - - log.Infof("[DL] Recieved matchdetails for match %d (%s)", matchId, time.Since(t)) - - matchZero := matchDetails.GetMatches()[0] - lastRound := matchZero.GetRoundstatsall()[len(matchZero.Roundstatsall)-1] - var players []*ent.Player - - for _, accountId := range lastRound.GetReservation().GetAccountIds() { - tPlayer, err := utils.Player(dml.db, AccountId2SteamId(accountId), apiKey, rl) - if err != nil { - log.Warningf("[DL] Unable to get player for steamid %d: %v", AccountId2SteamId(accountId), err) - continue - } - players = append(players, tPlayer) - } - - demo.Url = lastRound.GetMap() - demo.MatchId = matchZero.GetMatchid() - demo.DecryptionKey = []byte(strings.ToUpper(strconv.FormatUint(matchZero.GetWatchablematchinfo().GetClDecryptdataKeyPub(), 16))) - - tMatch, err := dml.db.Match.Create(). - SetID(matchZero.GetMatchid()). - AddPlayers(players...). - SetDate(time.Unix(int64(matchZero.GetMatchtime()), 0).UTC()). - SetMaxRounds(int(lastRound.GetMaxRounds())). - SetDuration(int(lastRound.GetMatchDuration())). - SetShareCode(demo.ShareCode). - SetReplayURL(lastRound.GetMap()). - SetScoreTeamA(int(lastRound.GetTeamScores()[0])). - SetScoreTeamB(int(lastRound.GetTeamScores()[1])). - SetMatchResult(int(lastRound.GetMatchResult())). - SetDecryptionKey(demo.DecryptionKey). - Save(context.Background()) - if err != nil { - log.Warningf("[DL] Unable to create match %d: %v", matchZero.GetMatchid(), err) - dml.unlockDemo(demo) - continue - } - - for i, mPlayer := range players { - var ( - teamId int - mk2 uint - mk3 uint - mk4 uint - mk5 uint - ) - - if i > 4 { - teamId = 2 - } else { - teamId = 1 - } - - var oldKills int32 - for _, round := range matchZero.GetRoundstatsall() { - kills, _, _, _, _, _ := playerStatsFromRound(round, mPlayer) - - killDiff := kills - oldKills - - switch killDiff { - case 2: - mk2++ - case 3: - mk3++ - case 4: - mk4++ - case 5: - mk5++ - } - oldKills = kills - } - - kills, deaths, assists, hs, score, mvp := playerStatsFromRound(lastRound, mPlayer) - err := dml.db.MatchPlayer.Create(). - SetMatches(tMatch). - SetPlayers(mPlayer). - SetTeamID(teamId). - SetKills(int(kills)). - SetDeaths(int(deaths)). - SetAssists(int(assists)). - SetMvp(uint(mvp)). - SetScore(int(score)). - SetHeadshot(int(hs)). - SetMk2(mk2). - SetMk3(mk3). - SetMk4(mk4). - SetMk5(mk5). - Exec(context.Background()) - if err != nil { - log.Warningf("[DL] Unable to create stats for player %d in match %d: %v", mPlayer.ID, tMatch.ID, err) - } - } - - // clear cache or regen values player - for _, p := range players { - err = dml.cache.Delete(context.Background(), fmt.Sprintf(utils.SideMetaCacheKey, p.ID)) - if err != nil { - log.Warningf("[DL] Unable to delete cache key %s: %v", fmt.Sprintf(utils.SideMetaCacheKey, p.ID), err) - } - - w, l, t, err := utils.GetWinLossTieForPlayer(p) - if err != nil { - log.Warningf("[DL] Failure to calculate WinLossTie for player %d: %v", p.ID, err) - continue - } - err = p.Update().SetWins(w).SetTies(t).SetLooses(l).Exec(context.Background()) - if err != nil { - log.Warningf("[DL] Failure to save WinLossTie for player %d: %v", p.ID, err) - } - } - - err = dml.dp.ParseDemo(demo) - if err != nil { - log.Warningf("[DL] Can't queue demo %d for parsing: %v", demo.MatchId, err) - dml.unlockDemo(demo) + log.Warningf("[DL] Error handling demo: %v", err) } } }