package csgo import ( "context" "fmt" "github.com/an0nfunc/go-steam/v3" "github.com/an0nfunc/go-steam/v3/csgo/protocol/protobuf" "github.com/an0nfunc/go-steam/v3/protocol/gamecoordinator" "github.com/an0nfunc/go-steam/v3/protocol/steamlang" "github.com/go-redis/cache/v8" "github.com/pkg/errors" "github.com/sethvargo/go-retry" log "github.com/sirupsen/logrus" "golang.org/x/time/rate" "google.golang.org/protobuf/proto" "os" "somegit.dev/csgowtf/csgowtfd/ent" "somegit.dev/csgowtf/csgowtfd/utils" "strings" "sync" "time" ) const ( APPID = 730 LoginFailed = iota LoginSuccess ) type DemoMatchLoaderConfig struct { Username string Password string AuthCode string Sentry string LoginKey string DB *ent.Client Worker int APIKey string RateLimit *rate.Limiter Cache *cache.Cache SprayTimeout int RetryTimeout int } type DemoMatchLoader struct { client *steam.Client GCReady bool steamLogin *steam.LogOnDetails matchRecv chan *protobuf.CMsgGCCStrike15V2_MatchList sentryFile string loginKey string db *ent.Client dp *DemoParser parseDemo chan *Demo parseMap map[string]bool parseMapL *sync.RWMutex cache *cache.Cache connectionWait retry.Backoff connectionWaitTmpl retry.Backoff connectFeedback chan int LoggedIn bool } func AccountID2SteamID(accID uint32) uint64 { return uint64(accID) + 76561197960265728 //nolint:gomnd } func SteamID2AccountID(steamID uint64) uint32 { return uint32(steamID - 76561197960265728) //nolint:gomnd } func playerStatsFromRound(round *protobuf.CMsgGCCStrike15V2_MatchmakingServerRoundStats, p *ent.Player) (kills int32, deaths int32, assists int32, headshots int32, score int32, mvps int32) { for i, acc := range round.GetReservation().GetAccountIds() { if AccountID2SteamID(acc) == p.ID { return round.GetKills()[i], round.GetDeaths()[i], round.GetAssists()[i], round.GetEnemyHeadshots()[i], round.GetScores()[i], round.GetMvps()[i] } } return 0, 0, 0, 0, 0, 0 } func (dml *DemoMatchLoader) IsLoading(demo *Demo) bool { dml.parseMapL.RLock() defer dml.parseMapL.RUnlock() if _, ok := dml.parseMap[demo.ShareCode]; ok { return true } return false } func (dml *DemoMatchLoader) unlockDemo(demo *Demo) { dml.parseMapL.Lock() defer dml.parseMapL.Unlock() delete(dml.parseMap, demo.ShareCode) } func (dml *DemoMatchLoader) lockDemo(demo *Demo) { dml.parseMapL.Lock() defer dml.parseMapL.Unlock() dml.parseMap[demo.ShareCode] = true } func (dml *DemoMatchLoader) HandleGCPacket(pkg *gamecoordinator.GCPacket) { switch pkg.MsgType { case uint32(protobuf.EGCBaseClientMsg_k_EMsgGCClientWelcome): msg := &protobuf.CMsgClientWelcome{} err := proto.Unmarshal(pkg.Body, msg) if err != nil { log.Errorf("[DL] Unable to unmarshal event %v: %v", pkg.MsgType, err) } log.Debugf("[GC] Welcome: %+v", msg) dml.GCReady = true case uint32(protobuf.EGCBaseClientMsg_k_EMsgGCClientConnectionStatus): msg := &protobuf.CMsgConnectionStatus{} err := proto.Unmarshal(pkg.Body, msg) if err != nil { log.Errorf("[GC] Unable to unmarshal event %v: %v", pkg.MsgType, err) } log.Debugf("[GC] Status: %+v", msg) if msg.GetStatus() != protobuf.GCConnectionStatus_GCConnectionStatus_HAVE_SESSION { dml.GCReady = false go dml.greetGC() } case uint32(protobuf.ECsgoGCMsg_k_EMsgGCCStrike15_v2_GC2ClientGlobalStats): msg := &protobuf.GlobalStatistics{} err := proto.Unmarshal(pkg.Body, msg) if err != nil { log.Errorf("[GC] Unable to unmarshal event %v: %v", pkg.MsgType, err) } log.Debugf("[GC] Stats: %+v", msg) dml.GCReady = true case uint32(protobuf.ECsgoGCMsg_k_EMsgGCCStrike15_v2_MatchList): msg := &protobuf.CMsgGCCStrike15V2_MatchList{} err := proto.Unmarshal(pkg.Body, msg) if err != nil { log.Errorf("[GC] Unable to unmarshal event %v: %v", pkg.MsgType, err) } dml.matchRecv <- msg default: log.Debugf("[GC] Unhandled message: %+v", pkg) } } func (dml *DemoMatchLoader) getMatchDetails(sharecode string) (*protobuf.CMsgGCCStrike15V2_MatchList, error) { if !dml.GCReady { return nil, fmt.Errorf("gc not ready") } matchID, outcomeID, tokenID, err := DecodeSharecode(sharecode) if err != nil { return nil, err } err = dml.requestDemoInfo(matchID, outcomeID, uint32(tokenID)) if err != nil { return nil, err } for matchDetails := range dml.matchRecv { if *matchDetails.Matches[0].Matchid == matchID { return matchDetails, nil } else { dml.matchRecv <- matchDetails } } return nil, err } func (dml *DemoMatchLoader) connectToSteam() error { if dml.client.Connected() { return nil } _, err := dml.client.Connect() if err != nil { return err } return nil } func (dml *DemoMatchLoader) Setup(config *DemoMatchLoaderConfig) error { dml.loginKey = config.LoginKey dml.sentryFile = config.Sentry dml.db = config.DB dml.dp = &DemoParser{} dml.parseMap = map[string]bool{} dml.parseMapL = new(sync.RWMutex) dml.cache = config.Cache dml.connectFeedback = make(chan int, 10) //nolint:gomnd dml.connectionWait = retry.WithCappedDuration(time.Minute*time.Duration(config.RetryTimeout), retry.NewExponential(time.Minute)) dml.connectionWaitTmpl = retry.WithCappedDuration(time.Minute*time.Duration(config.RetryTimeout), retry.NewExponential(time.Minute)) err := dml.dp.Setup(config.DB, config.Worker, config.SprayTimeout) if err != nil { return err } dml.steamLogin = new(steam.LogOnDetails) dml.steamLogin.Username = config.Username dml.steamLogin.Password = config.Password dml.steamLogin.AuthCode = config.AuthCode dml.steamLogin.ShouldRememberPassword = true if _, err := os.Stat(dml.sentryFile); err == nil { hash, err := os.ReadFile(dml.sentryFile) if err != nil { return err } dml.steamLogin.SentryFileHash = hash } if _, err := os.Stat(dml.loginKey); err == nil { hash, err := os.ReadFile(dml.loginKey) if err != nil { return err } dml.steamLogin.LoginKey = string(hash) } dml.client = steam.NewClient() err = steam.InitializeSteamDirectory() if err != nil { return err } dml.matchRecv = make(chan *protobuf.CMsgGCCStrike15V2_MatchList, 1000) //nolint:gomnd dml.parseDemo = make(chan *Demo, 1000) //nolint:gomnd go dml.connectLoop() go dml.steamEventHandler() go dml.demoWorker() for i := 0; i < config.Worker; i++ { go dml.gcWorker(config.APIKey, config.RateLimit) } dml.connectFeedback <- LoginFailed return nil } func (dml *DemoMatchLoader) LoadDemo(demo *Demo) error { if dml.IsLoading(demo) { log.Infof("[DL] Skipping %s: parsing in progress", demo.ShareCode) return nil } dml.lockDemo(demo) select { case dml.parseDemo <- demo: return nil default: dml.unlockDemo(demo) return fmt.Errorf("queue full") } } func (dml *DemoMatchLoader) connectLoop() { for res := range dml.connectFeedback { switch res { case LoginFailed: if sleep, ok := dml.connectionWait.Next(); !ok { time.Sleep(sleep) } else { panic("retry should never be stop") } if !dml.LoggedIn { log.Infof("[DL] Connecting to steam...") err := dml.connectToSteam() if err != nil { log.Warningf("[DL] Error connecting to steam: %v", err) } } case LoginSuccess: log.Info("[DL] Steam login successfully restored") dml.connectionWait = dml.connectionWaitTmpl } } } func (dml *DemoMatchLoader) steamEventHandler() { for event := range dml.client.Events() { switch e := event.(type) { case *steam.ConnectedEvent: log.Debug("[DL] Connected!") dml.client.Auth.LogOn(dml.steamLogin) case *steam.MachineAuthUpdateEvent: log.Debug("[DL] Got sentry!") err := os.WriteFile(dml.sentryFile, e.Hash, os.ModePerm) if err != nil { log.Errorf("[DL] Unable write sentry file: %v", err) } case *steam.LoggedOnEvent: log.Debug("[DL] Steam login success!") dml.LoggedIn = true dml.connectFeedback <- LoginSuccess dml.client.Social.SetPersonaState(steamlang.EPersonaState_Online) go dml.setPlaying() case *steam.LogOnFailedEvent: log.Debugf("[DL] Steam login denied: %+v", e) switch e.Result { case steamlang.EResult_AccountLogonDenied: log.Fatalf("[DL] Please provide AuthCode in config") case steamlang.EResult_InvalidPassword: _ = os.Remove(dml.sentryFile) _ = os.Remove(dml.loginKey) log.Warningf("[DL] Steam login failed: InvalidPassword") case steamlang.EResult_InvalidLoginAuthCode: log.Fatalf("[DL] Steam auth code wrong") default: log.Warningf("[DL] Unhandled login failed event %+v", e) } case *steam.DisconnectedEvent: log.Warningf("[DL] Steam disconnected, trying to reconnect...") dml.GCReady = false dml.LoggedIn = false dml.connectFeedback <- LoginFailed case *steam.LoginKeyEvent: log.Debug("Got login_key!") err := os.WriteFile(dml.loginKey, []byte(e.LoginKey), os.ModePerm) if err != nil { log.Errorf("[DL] Unable write login_key: %v", err) } case *steam.FatalErrorEvent: log.Debugf("[DL] Got FatalError %+v", e) case error: log.Warningf("[DL] Error: %+v", e) dml.GCReady = false dml.LoggedIn = false dml.connectFeedback <- LoginFailed default: log.Debugf("[DL] %T: %v", e, e) } } } func (dml *DemoMatchLoader) setPlaying() { dml.client.GC.SetGamesPlayed(APPID) dml.client.GC.RegisterPacketHandler(dml) go dml.greetGC() } func (dml *DemoMatchLoader) greetGC() { for !dml.GCReady { log.Debugf("[DL] Sending GC greeting") msg := protobuf.CMsgClientHello{} dml.client.GC.Write(gamecoordinator.NewGCMsgProtobuf(APPID, uint32(protobuf.EGCBaseClientMsg_k_EMsgGCClientHello), &msg)) time.Sleep(1 * time.Second) } } func (dml *DemoMatchLoader) requestDemoInfo(matchID, conclusionID uint64, tokenID uint32) error { if !dml.GCReady { return fmt.Errorf("gc not ready") } msg := protobuf.CMsgGCCStrike15V2_MatchListRequestFullGameInfo{ Matchid: &matchID, Outcomeid: &conclusionID, Token: &tokenID, } dml.client.GC.Write(gamecoordinator.NewGCMsgProtobuf(APPID, uint32(protobuf.ECsgoGCMsg_k_EMsgGCCStrike15_v2_MatchListRequestFullGameInfo), &msg)) return nil } func (dml *DemoMatchLoader) demoWorker() { for demo := range dml.dp.Done { dml.unlockDemo(demo) } } func (dml *DemoMatchLoader) handleDemo(demo *Demo, apiKey string, rl *rate.Limiter) error { defer dml.unlockDemo(demo) if !dml.GCReady { log.Infof("[DL] Postponing match %d (%s): GC not ready", demo.MatchID, demo.ShareCode) time.Sleep(5 * time.Second) dml.parseDemo <- demo return nil } matchID, _, _, err := DecodeSharecode(demo.ShareCode) if err != nil || matchID == 0 { return fmt.Errorf("error decoding sharecode %s: %w", demo.ShareCode, err) } iMatch, err := dml.db.Match.Get(context.Background(), matchID) if err != nil && !ent.IsNotFound(err) { return fmt.Errorf("error looking up match: %w", err) } else if err == nil { if !iMatch.DemoParsed && iMatch.Date.After(time.Now().UTC().AddDate(0, 0, -30)) { log.Infof("[DL] Match %d is loaded, but not parsed. Try parsing.", iMatch.ID) demo.MatchID = matchID demo.URL = iMatch.ReplayURL demo.DecryptionKey = iMatch.DecryptionKey err := dml.dp.ParseDemo(demo) if err != nil { return fmt.Errorf("error parsing match %d: %w", demo.MatchID, err) } return nil } log.Infof("[DL] Skipped match %d: already loaded", matchID) return nil } log.Infof("[DL] Requesting match %d from GC", matchID) t := time.Now() matchDetails, err := dml.getMatchDetails(demo.ShareCode) if err != nil { return fmt.Errorf("error getting match-details for %d: %w", demo.MatchID, err) } log.Infof("[DL] Received matchdetails for match %d (%s)", matchID, time.Since(t)) // init tx tx, err := dml.db.Tx(context.Background()) if err != nil { return errors.Wrap(err, "error creating transaction") } matchZero := matchDetails.GetMatches()[0] lastRound := matchZero.GetRoundstatsall()[len(matchZero.Roundstatsall)-1] var players []*ent.Player for _, accountID := range lastRound.GetReservation().GetAccountIds() { tPlayer, err := utils.Player(tx.Client(), AccountID2SteamID(accountID), apiKey, rl) if err != nil { err = utils.Rollback(tx, err) return fmt.Errorf("error getting player for steamid %d: %w", AccountID2SteamID(accountID), err) } players = append(players, tPlayer) } demo.URL = lastRound.GetMap() demo.MatchID = matchZero.GetMatchid() demo.DecryptionKey = []byte(strings.ToUpper(fmt.Sprintf("%016x", matchZero.GetWatchablematchinfo().GetClDecryptdataKeyPub()))) tMatch, err := tx.Match.Create(). SetID(matchZero.GetMatchid()). AddPlayers(players...). SetDate(time.Unix(int64(matchZero.GetMatchtime()), 0).UTC()). SetMaxRounds(int(lastRound.GetMaxRounds())). SetDuration(int(lastRound.GetMatchDuration())). SetShareCode(demo.ShareCode). SetReplayURL(lastRound.GetMap()). SetScoreTeamA(int(lastRound.GetTeamScores()[0])). SetScoreTeamB(int(lastRound.GetTeamScores()[1])). SetMatchResult(int(lastRound.GetMatchResult())). SetDecryptionKey(demo.DecryptionKey). Save(context.Background()) if err != nil { err = utils.Rollback(tx, err) return fmt.Errorf("error creating match %d: %w", matchZero.GetMatchid(), err) } for i, mPlayer := range players { var ( teamID int mk2, mk3, mk4, mk5 uint ) if i > 4 { teamID = 2 } else { teamID = 1 } var oldKills int32 for _, round := range matchZero.GetRoundstatsall() { kills, _, _, _, _, _ := playerStatsFromRound(round, mPlayer) switch kills - oldKills { case 2: mk2++ case 3: mk3++ case 4: mk4++ case 5: mk5++ } oldKills = kills } kills, deaths, assists, hs, score, mvp := playerStatsFromRound(lastRound, mPlayer) err = tx.MatchPlayer.Create(). SetMatches(tMatch). SetPlayers(mPlayer). SetTeamID(teamID). SetKills(int(kills)). SetDeaths(int(deaths)). SetAssists(int(assists)). SetMvp(uint(mvp)). SetScore(int(score)). SetHeadshot(int(hs)). SetMk2(mk2). SetMk3(mk3). SetMk4(mk4). SetMk5(mk5). Exec(context.Background()) if err != nil { err = utils.Rollback(tx, err) return fmt.Errorf("error creating stats for player %d in match %d: %w", mPlayer.ID, tMatch.ID, err) } } // clear cache or regen values player for _, p := range players { err = dml.cache.Delete(context.Background(), fmt.Sprintf(utils.SideMetaCacheKey, p.ID)) if err != nil { err = utils.Rollback(tx, err) return fmt.Errorf("error deleting cache key %s: %w", fmt.Sprintf(utils.SideMetaCacheKey, p.ID), err) } w, l, t, err := utils.GetWinLossTieForPlayer(p) if err != nil { err = utils.Rollback(tx, err) return fmt.Errorf("error calculating WinLossTie for player %d: %w", p.ID, err) } err = p.Update().SetWins(w).SetTies(t).SetLooses(l).Exec(context.Background()) if err != nil { err = utils.Rollback(tx, err) return fmt.Errorf("error saving WinLossTie for player %d: %w", p.ID, err) } } err = tx.Commit() if err != nil { return errors.Wrap(err, "[DP] error committing to db") } err = dml.dp.ParseDemo(demo) if err != nil { return fmt.Errorf("error queueing demo %d for parsing: %w", demo.MatchID, err) } return nil } func (dml *DemoMatchLoader) gcWorker(apiKey string, rl *rate.Limiter) { for demo := range dml.parseDemo { err := dml.handleDemo(demo, apiKey, rl) if err != nil { log.Warningf("[DL] Error handling demo: %v", err) } } }