Files
csgowtfd/csgo/demo_loader.go

551 lines
15 KiB
Go

package csgo
import (
"context"
"fmt"
"github.com/an0nfunc/go-steam/v3"
"github.com/an0nfunc/go-steam/v3/csgo/protocol/protobuf"
"github.com/an0nfunc/go-steam/v3/protocol/gamecoordinator"
"github.com/an0nfunc/go-steam/v3/protocol/steamlang"
"github.com/go-redis/cache/v8"
"github.com/pkg/errors"
"github.com/sethvargo/go-retry"
log "github.com/sirupsen/logrus"
"golang.org/x/time/rate"
"google.golang.org/protobuf/proto"
"os"
"somegit.dev/csgowtf/csgowtfd/ent"
"somegit.dev/csgowtf/csgowtfd/utils"
"strings"
"sync"
"time"
)
const (
APPID = 730
LoginFailed = iota
LoginSuccess
)
type DemoMatchLoaderConfig struct {
Username string
Password string
AuthCode string
Sentry string
LoginKey string
DB *ent.Client
Worker int
APIKey string
RateLimit *rate.Limiter
Cache *cache.Cache
SprayTimeout int
RetryTimeout int
}
type DemoMatchLoader struct {
client *steam.Client
GCReady bool
steamLogin *steam.LogOnDetails
matchRecv chan *protobuf.CMsgGCCStrike15V2_MatchList
sentryFile string
loginKey string
db *ent.Client
dp *DemoParser
parseDemo chan *Demo
parseMap map[string]bool
parseMapL *sync.RWMutex
cache *cache.Cache
connectionWait retry.Backoff
connectionWaitTmpl retry.Backoff
connectFeedback chan int
LoggedIn bool
}
func AccountID2SteamID(accID uint32) uint64 {
return uint64(accID) + 76561197960265728 //nolint:gomnd
}
func SteamID2AccountID(steamID uint64) uint32 {
return uint32(steamID - 76561197960265728) //nolint:gomnd
}
func playerStatsFromRound(round *protobuf.CMsgGCCStrike15V2_MatchmakingServerRoundStats, p *ent.Player) (kills int32,
deaths int32, assists int32, headshots int32, score int32, mvps int32) {
for i, acc := range round.GetReservation().GetAccountIds() {
if AccountID2SteamID(acc) == p.ID {
return round.GetKills()[i], round.GetDeaths()[i], round.GetAssists()[i], round.GetEnemyHeadshots()[i],
round.GetScores()[i], round.GetMvps()[i]
}
}
return 0, 0, 0, 0, 0, 0
}
func (dml *DemoMatchLoader) IsLoading(demo *Demo) bool {
dml.parseMapL.RLock()
defer dml.parseMapL.RUnlock()
if _, ok := dml.parseMap[demo.ShareCode]; ok {
return true
}
return false
}
func (dml *DemoMatchLoader) unlockDemo(demo *Demo) {
dml.parseMapL.Lock()
defer dml.parseMapL.Unlock()
delete(dml.parseMap, demo.ShareCode)
}
func (dml *DemoMatchLoader) lockDemo(demo *Demo) {
dml.parseMapL.Lock()
defer dml.parseMapL.Unlock()
dml.parseMap[demo.ShareCode] = true
}
func (dml *DemoMatchLoader) HandleGCPacket(pkg *gamecoordinator.GCPacket) {
switch pkg.MsgType {
case uint32(protobuf.EGCBaseClientMsg_k_EMsgGCClientWelcome):
msg := &protobuf.CMsgClientWelcome{}
err := proto.Unmarshal(pkg.Body, msg)
if err != nil {
log.Errorf("[DL] Unable to unmarshal event %v: %v", pkg.MsgType, err)
}
log.Debugf("[GC] Welcome: %+v", msg)
dml.GCReady = true
case uint32(protobuf.EGCBaseClientMsg_k_EMsgGCClientConnectionStatus):
msg := &protobuf.CMsgConnectionStatus{}
err := proto.Unmarshal(pkg.Body, msg)
if err != nil {
log.Errorf("[GC] Unable to unmarshal event %v: %v", pkg.MsgType, err)
}
log.Debugf("[GC] Status: %+v", msg)
if msg.GetStatus() != protobuf.GCConnectionStatus_GCConnectionStatus_HAVE_SESSION {
dml.GCReady = false
go dml.greetGC()
}
case uint32(protobuf.ECsgoGCMsg_k_EMsgGCCStrike15_v2_GC2ClientGlobalStats):
msg := &protobuf.GlobalStatistics{}
err := proto.Unmarshal(pkg.Body, msg)
if err != nil {
log.Errorf("[GC] Unable to unmarshal event %v: %v", pkg.MsgType, err)
}
log.Debugf("[GC] Stats: %+v", msg)
dml.GCReady = true
case uint32(protobuf.ECsgoGCMsg_k_EMsgGCCStrike15_v2_MatchList):
msg := &protobuf.CMsgGCCStrike15V2_MatchList{}
err := proto.Unmarshal(pkg.Body, msg)
if err != nil {
log.Errorf("[GC] Unable to unmarshal event %v: %v", pkg.MsgType, err)
}
dml.matchRecv <- msg
default:
log.Debugf("[GC] Unhandled message: %+v", pkg)
}
}
func (dml *DemoMatchLoader) getMatchDetails(sharecode string) (*protobuf.CMsgGCCStrike15V2_MatchList, error) {
if !dml.GCReady {
return nil, fmt.Errorf("gc not ready")
}
matchID, outcomeID, tokenID, err := DecodeSharecode(sharecode)
if err != nil {
return nil, err
}
err = dml.requestDemoInfo(matchID, outcomeID, uint32(tokenID))
if err != nil {
return nil, err
}
for matchDetails := range dml.matchRecv {
if *matchDetails.Matches[0].Matchid == matchID {
return matchDetails, nil
}
dml.matchRecv <- matchDetails
}
return nil, err
}
func (dml *DemoMatchLoader) connectToSteam() error {
if dml.client.Connected() {
return nil
}
if _, err := dml.client.Connect(); err != nil {
return err
}
return nil
}
func (dml *DemoMatchLoader) Setup(config *DemoMatchLoaderConfig) error {
dml.loginKey = config.LoginKey
dml.sentryFile = config.Sentry
dml.db = config.DB
dml.dp = &DemoParser{}
dml.parseMap = map[string]bool{}
dml.parseMapL = new(sync.RWMutex)
dml.cache = config.Cache
dml.connectFeedback = make(chan int, 10)
dml.connectionWait = retry.WithCappedDuration(time.Minute*time.Duration(config.RetryTimeout), retry.NewExponential(time.Minute))
dml.connectionWaitTmpl = retry.WithCappedDuration(time.Minute*time.Duration(config.RetryTimeout), retry.NewExponential(time.Minute))
err := dml.dp.Setup(config.DB, config.Worker, config.SprayTimeout)
if err != nil {
return err
}
dml.steamLogin = new(steam.LogOnDetails)
dml.steamLogin.Username = config.Username
dml.steamLogin.Password = config.Password
dml.steamLogin.AuthCode = config.AuthCode
dml.steamLogin.ShouldRememberPassword = true
if _, err := os.Stat(dml.sentryFile); err == nil {
hash, err := os.ReadFile(dml.sentryFile)
if err != nil {
return err
}
dml.steamLogin.SentryFileHash = hash
}
if _, err := os.Stat(dml.loginKey); err == nil {
hash, err := os.ReadFile(dml.loginKey)
if err != nil {
return err
}
dml.steamLogin.LoginKey = string(hash)
}
dml.client = steam.NewClient()
err = steam.InitializeSteamDirectory()
if err != nil {
return err
}
dml.matchRecv = make(chan *protobuf.CMsgGCCStrike15V2_MatchList, 1000)
dml.parseDemo = make(chan *Demo, 1000)
go dml.connectLoop()
go dml.steamEventHandler()
go dml.demoWorker()
for i := 0; i < config.Worker; i++ {
go dml.gcWorker(config.APIKey, config.RateLimit)
}
dml.connectFeedback <- LoginFailed
return nil
}
func (dml *DemoMatchLoader) LoadDemo(demo *Demo) error {
if dml.IsLoading(demo) {
log.Infof("[DL] Skipping %s: parsing in progress", demo.ShareCode)
return nil
}
dml.lockDemo(demo)
select {
case dml.parseDemo <- demo:
return nil
default:
dml.unlockDemo(demo)
return fmt.Errorf("queue full")
}
}
func (dml *DemoMatchLoader) connectLoop() {
for res := range dml.connectFeedback {
switch res {
case LoginFailed:
if sleep, ok := dml.connectionWait.Next(); !ok {
time.Sleep(sleep)
} else {
panic("retry should never stop")
}
if !dml.LoggedIn {
log.Infof("[DL] Connecting to steam...")
err := dml.connectToSteam()
if err != nil {
log.Warningf("[DL] Error connecting to steam: %v", err)
}
}
case LoginSuccess:
log.Info("[DL] Steam login successfully restored")
dml.connectionWait = dml.connectionWaitTmpl
}
}
}
func (dml *DemoMatchLoader) steamEventHandler() {
for event := range dml.client.Events() {
switch e := event.(type) {
case *steam.ConnectedEvent:
log.Debug("[DL] Connected!")
dml.client.Auth.LogOn(dml.steamLogin)
case *steam.MachineAuthUpdateEvent:
log.Debug("[DL] Got sentry!")
err := os.WriteFile(dml.sentryFile, e.Hash, os.ModePerm)
if err != nil {
log.Errorf("[DL] Unable write sentry file: %v", err)
}
case *steam.LoggedOnEvent:
log.Debug("[DL] Steam login success!")
dml.LoggedIn = true
dml.connectFeedback <- LoginSuccess
dml.client.Social.SetPersonaState(steamlang.EPersonaState_Online)
go dml.setPlaying()
case *steam.LogOnFailedEvent:
log.Debugf("[DL] Steam login denied: %+v", e)
switch e.Result { //nolint:exhaustive
case steamlang.EResult_AccountLogonDenied:
log.Fatalf("[DL] Please provide AuthCode in config")
case steamlang.EResult_InvalidPassword:
_ = os.Remove(dml.sentryFile)
_ = os.Remove(dml.loginKey)
log.Warningf("[DL] Steam login failed: InvalidPassword")
case steamlang.EResult_InvalidLoginAuthCode:
log.Fatalf("[DL] Steam auth code wrong")
default:
log.Warningf("[DL] Unhandled login failed event %+v", e)
}
case *steam.DisconnectedEvent:
log.Warningf("[DL] Steam disconnected, trying to reconnect...")
dml.GCReady = false
dml.LoggedIn = false
dml.connectFeedback <- LoginFailed
case *steam.LoginKeyEvent:
log.Debug("Got login_key!")
err := os.WriteFile(dml.loginKey, []byte(e.LoginKey), os.ModePerm)
if err != nil {
log.Errorf("[DL] Unable write login_key: %v", err)
}
case *steam.FatalErrorEvent:
log.Debugf("[DL] Got FatalError %+v", e)
case error:
log.Warningf("[DL] Error: %+v", e)
dml.GCReady = false
dml.LoggedIn = false
dml.connectFeedback <- LoginFailed
default:
log.Debugf("[DL] %T: %v", e, e)
}
}
}
func (dml *DemoMatchLoader) setPlaying() {
dml.client.GC.SetGamesPlayed(APPID)
dml.client.GC.RegisterPacketHandler(dml)
go dml.greetGC()
}
func (dml *DemoMatchLoader) greetGC() {
for !dml.GCReady {
log.Debugf("[DL] Sending GC greeting")
msg := protobuf.CMsgClientHello{}
dml.client.GC.Write(gamecoordinator.NewGCMsgProtobuf(APPID, uint32(protobuf.EGCBaseClientMsg_k_EMsgGCClientHello), &msg))
time.Sleep(1 * time.Second)
}
}
func (dml *DemoMatchLoader) requestDemoInfo(matchID, conclusionID uint64, tokenID uint32) error {
if !dml.GCReady {
return fmt.Errorf("gc not ready")
}
msg := protobuf.CMsgGCCStrike15V2_MatchListRequestFullGameInfo{
Matchid: &matchID,
Outcomeid: &conclusionID,
Token: &tokenID,
}
dml.client.GC.Write(gamecoordinator.NewGCMsgProtobuf(APPID,
uint32(protobuf.ECsgoGCMsg_k_EMsgGCCStrike15_v2_MatchListRequestFullGameInfo), &msg))
return nil
}
func (dml *DemoMatchLoader) demoWorker() {
for demo := range dml.dp.Done {
dml.unlockDemo(demo)
}
}
func (dml *DemoMatchLoader) handleDemo(demo *Demo, apiKey string, rl *rate.Limiter) error {
defer dml.unlockDemo(demo)
if !dml.GCReady {
log.Infof("[DL] Postponing match %d (%s): GC not ready", demo.MatchID, demo.ShareCode)
time.Sleep(5 * time.Second)
dml.parseDemo <- demo
return nil
}
matchID, _, _, err := DecodeSharecode(demo.ShareCode)
if err != nil || matchID == 0 {
return fmt.Errorf("error decoding sharecode %s: %w", demo.ShareCode, err)
}
iMatch, err := dml.db.Match.Get(context.Background(), matchID)
if err != nil && !ent.IsNotFound(err) {
return fmt.Errorf("error looking up match: %w", err)
} else if err == nil {
if !iMatch.DemoParsed && iMatch.Date.After(time.Now().UTC().AddDate(0, 0, -30)) {
log.Infof("[DL] Match %d is loaded, but not parsed. Try parsing.", iMatch.ID)
demo.MatchID = matchID
demo.URL = iMatch.ReplayURL
demo.DecryptionKey = iMatch.DecryptionKey
err := dml.dp.ParseDemo(demo)
if err != nil {
return fmt.Errorf("error parsing match %d: %w", demo.MatchID, err)
}
return nil
}
log.Infof("[DL] Skipped match %d: already loaded", matchID)
return nil
}
log.Infof("[DL] Requesting match %d from GC", matchID)
t := time.Now()
matchDetails, err := dml.getMatchDetails(demo.ShareCode)
if err != nil {
return fmt.Errorf("error getting match-details for %d: %w", demo.MatchID, err)
}
log.Infof("[DL] Received matchdetails for match %d (%s)", matchID, time.Since(t))
// init tx
tx, err := dml.db.Tx(context.Background())
if err != nil {
return errors.Wrap(err, "error creating transaction")
}
matchZero := matchDetails.GetMatches()[0]
lastRound := matchZero.GetRoundstatsall()[len(matchZero.Roundstatsall)-1]
var players []*ent.Player //nolint:prealloc
for _, accountID := range lastRound.GetReservation().GetAccountIds() {
tPlayer, err := utils.Player(tx.Client(), AccountID2SteamID(accountID), apiKey, rl)
if err != nil {
err = utils.Rollback(tx, err)
return fmt.Errorf("error getting player for steamid %d: %w", AccountID2SteamID(accountID), err)
}
players = append(players, tPlayer)
}
demo.URL = lastRound.GetMap()
demo.MatchID = matchZero.GetMatchid()
demo.DecryptionKey = []byte(strings.ToUpper(fmt.Sprintf("%016x", matchZero.GetWatchablematchinfo().GetClDecryptdataKeyPub())))
tMatch, err := tx.Match.Create().
SetID(matchZero.GetMatchid()).
AddPlayers(players...).
SetDate(time.Unix(int64(matchZero.GetMatchtime()), 0).UTC()).
SetMaxRounds(int(lastRound.GetMaxRounds())).
SetDuration(int(lastRound.GetMatchDuration())).
SetShareCode(demo.ShareCode).
SetReplayURL(lastRound.GetMap()).
SetScoreTeamA(int(lastRound.GetTeamScores()[0])).
SetScoreTeamB(int(lastRound.GetTeamScores()[1])).
SetMatchResult(int(lastRound.GetMatchResult())).
SetDecryptionKey(demo.DecryptionKey).
Save(context.Background())
if err != nil {
err = utils.Rollback(tx, err)
return fmt.Errorf("error creating match %d: %w", matchZero.GetMatchid(), err)
}
for i, mPlayer := range players {
var (
teamID int
mk2, mk3, mk4, mk5 uint
)
if i > 4 {
teamID = 2
} else {
teamID = 1
}
var oldKills int32
for _, round := range matchZero.GetRoundstatsall() {
kills, _, _, _, _, _ := playerStatsFromRound(round, mPlayer)
switch kills - oldKills {
case 2:
mk2++
case 3:
mk3++
case 4:
mk4++
case 5:
mk5++
}
oldKills = kills
}
kills, deaths, assists, hs, score, mvp := playerStatsFromRound(lastRound, mPlayer)
err = tx.MatchPlayer.Create().
SetMatches(tMatch).
SetPlayers(mPlayer).
SetTeamID(teamID).
SetKills(int(kills)).
SetDeaths(int(deaths)).
SetAssists(int(assists)).
SetMvp(uint(mvp)).
SetScore(int(score)).
SetHeadshot(int(hs)).
SetMk2(mk2).
SetMk3(mk3).
SetMk4(mk4).
SetMk5(mk5).
Exec(context.Background())
if err != nil {
err = utils.Rollback(tx, err)
return fmt.Errorf("error creating stats for player %d in match %d: %w", mPlayer.ID, tMatch.ID, err)
}
}
// clear cache or regen values player
for _, p := range players {
err = dml.cache.Delete(context.Background(), fmt.Sprintf(utils.SideMetaCacheKey, p.ID))
if err != nil {
err = utils.Rollback(tx, err)
return fmt.Errorf("error deleting cache key %s: %w", fmt.Sprintf(utils.SideMetaCacheKey, p.ID), err)
}
w, l, t, err := utils.GetWinLossTieForPlayer(p)
if err != nil {
err = utils.Rollback(tx, err)
return fmt.Errorf("error calculating WinLossTie for player %d: %w", p.ID, err)
}
err = p.Update().SetWins(w).SetTies(t).SetLooses(l).Exec(context.Background())
if err != nil {
err = utils.Rollback(tx, err)
return fmt.Errorf("error saving WinLossTie for player %d: %w", p.ID, err)
}
}
err = tx.Commit()
if err != nil {
return errors.Wrap(err, "[DP] error committing to db")
}
err = dml.dp.ParseDemo(demo)
if err != nil {
return fmt.Errorf("error queueing demo %d for parsing: %w", demo.MatchID, err)
}
return nil
}
func (dml *DemoMatchLoader) gcWorker(apiKey string, rl *rate.Limiter) {
for demo := range dml.parseDemo {
err := dml.handleDemo(demo, apiKey, rl)
if err != nil {
log.Warningf("[DL] Error handling demo: %v", err)
}
}
}