From abd8b74c08b99247424c72b8a21e1dc55d572a76 Mon Sep 17 00:00:00 2001 From: Giovanni Harting <539@idlegandalf.com> Date: Thu, 3 Nov 2022 03:18:24 +0100 Subject: [PATCH] switched to transactional queries for demo loading & parsing --- csgo/demo_loader.go | 79 +++++++++++++++++++++++++-------------------- csgo/demo_parser.go | 71 ++++++++++++++++++++++++++++++++-------- utils/utils.go | 10 ++++-- 3 files changed, 109 insertions(+), 51 deletions(-) diff --git a/csgo/demo_loader.go b/csgo/demo_loader.go index 4922bd1..008f653 100644 --- a/csgo/demo_loader.go +++ b/csgo/demo_loader.go @@ -11,11 +11,11 @@ import ( "github.com/an0nfunc/go-steam/v3/protocol/gamecoordinator" "github.com/an0nfunc/go-steam/v3/protocol/steamlang" "github.com/go-redis/cache/v8" + "github.com/pkg/errors" "github.com/sethvargo/go-retry" log "github.com/sirupsen/logrus" "golang.org/x/time/rate" "google.golang.org/protobuf/proto" - "io/ioutil" "os" "strings" "sync" @@ -44,22 +44,23 @@ type DemoMatchLoaderConfig struct { } type DemoMatchLoader struct { - client *steam.Client - GCReady bool - steamLogin *steam.LogOnDetails - matchRecv chan *protobuf.CMsgGCCStrike15V2_MatchList - cmList []*netutil.PortAddr - sentryFile string - loginKey string - db *ent.Client - dp *DemoParser - parseDemo chan *Demo - parseMap map[string]bool - parseMapL *sync.RWMutex - cache *cache.Cache - connectionWait retry.Backoff - connectFeedback chan int - LoggedIn bool + client *steam.Client + GCReady bool + steamLogin *steam.LogOnDetails + matchRecv chan *protobuf.CMsgGCCStrike15V2_MatchList + cmList []*netutil.PortAddr + sentryFile string + loginKey string + db *ent.Client + dp *DemoParser + parseDemo chan *Demo + parseMap map[string]bool + parseMapL *sync.RWMutex + cache *cache.Cache + connectionWait retry.Backoff + connectionWaitTmpl retry.Backoff + connectFeedback chan int + LoggedIn bool } func AccountId2SteamId(accId uint32) uint64 { @@ -115,7 +116,7 @@ func (dml *DemoMatchLoader) HandleGCPacket(pkg *gamecoordinator.GCPacket) { msg := &protobuf.CMsgConnectionStatus{} err := proto.Unmarshal(pkg.Body, msg) if err != nil { - log.Errorf("[DL] Unable to unmarshal event %v: %v", pkg.MsgType, err) + log.Errorf("[GC] Unable to unmarshal event %v: %v", pkg.MsgType, err) } log.Debugf("[GC] Status: %+v", msg) @@ -127,7 +128,7 @@ func (dml *DemoMatchLoader) HandleGCPacket(pkg *gamecoordinator.GCPacket) { msg := &protobuf.GlobalStatistics{} err := proto.Unmarshal(pkg.Body, msg) if err != nil { - log.Errorf("[DL] Unable to unmarshal event %v: %v", pkg.MsgType, err) + log.Errorf("[GC] Unable to unmarshal event %v: %v", pkg.MsgType, err) } log.Debugf("[GC] Stats: %+v", msg) dml.GCReady = true @@ -135,11 +136,11 @@ func (dml *DemoMatchLoader) HandleGCPacket(pkg *gamecoordinator.GCPacket) { msg := &protobuf.CMsgGCCStrike15V2_MatchList{} err := proto.Unmarshal(pkg.Body, msg) if err != nil { - log.Errorf("[DL] Unable to unmarshal event %v: %v", pkg.MsgType, err) + log.Errorf("[GC] Unable to unmarshal event %v: %v", pkg.MsgType, err) } dml.matchRecv <- msg default: - log.Debugf("[GC] Unhandled GC message: %+v", pkg) + log.Debugf("[GC] Unhandled message: %+v", pkg) } } @@ -190,8 +191,7 @@ func (dml *DemoMatchLoader) Setup(config *DemoMatchLoaderConfig) error { dml.parseMapL = new(sync.RWMutex) dml.cache = config.Cache dml.connectFeedback = make(chan int, 10) - dml.connectionWait = retry.NewExponential(time.Second) - dml.connectionWait = retry.WithCappedDuration(time.Minute*time.Duration(config.RetryTimeout), dml.connectionWait) + dml.connectionWait = retry.WithCappedDuration(time.Minute*time.Duration(config.RetryTimeout), retry.NewExponential(time.Minute)) err := dml.dp.Setup(config.Db, config.Worker, config.SprayTimeout) if err != nil { return err @@ -204,7 +204,7 @@ func (dml *DemoMatchLoader) Setup(config *DemoMatchLoaderConfig) error { dml.steamLogin.ShouldRememberPassword = true if _, err := os.Stat(dml.sentryFile); err == nil { - hash, err := ioutil.ReadFile(dml.sentryFile) + hash, err := os.ReadFile(dml.sentryFile) if err != nil { return err } @@ -212,7 +212,7 @@ func (dml *DemoMatchLoader) Setup(config *DemoMatchLoaderConfig) error { } if _, err := os.Stat(dml.loginKey); err == nil { - hash, err := ioutil.ReadFile(dml.loginKey) + hash, err := os.ReadFile(dml.loginKey) if err != nil { return err } @@ -264,6 +264,8 @@ func (dml *DemoMatchLoader) connectLoop() { case LoginFailed: if sleep, ok := dml.connectionWait.Next(); ok { time.Sleep(sleep) + } else { + panic("retry should never be stop") } if !dml.LoggedIn { log.Infof("[DL] Connecting to steam...") @@ -274,8 +276,8 @@ func (dml *DemoMatchLoader) connectLoop() { } } case LoginSuccess: - log.Infof("[DL] Steam login successfully restored after %d minutes", dml.connectionWait) - dml.connectionWait = retry.NewExponential(time.Minute) + log.Info("[DL] Steam login successfully restored") + dml.connectionWait = dml.connectionWaitTmpl } } } @@ -289,7 +291,7 @@ func (dml *DemoMatchLoader) steamEventHandler() { dml.client.Auth.LogOn(dml.steamLogin) case *steam.MachineAuthUpdateEvent: log.Debug("[DL] Got sentry!") - err := ioutil.WriteFile(dml.sentryFile, e.Hash, os.ModePerm) + err := os.WriteFile(dml.sentryFile, e.Hash, os.ModePerm) if err != nil { log.Errorf("[DL] Unable write sentry file: %v", err) } @@ -320,7 +322,7 @@ func (dml *DemoMatchLoader) steamEventHandler() { dml.connectFeedback <- LoginFailed case *steam.LoginKeyEvent: log.Debug("Got login_key!") - err := ioutil.WriteFile(dml.loginKey, []byte(e.LoginKey), os.ModePerm) + err := os.WriteFile(dml.loginKey, []byte(e.LoginKey), os.ModePerm) if err != nil { log.Errorf("[DL] Unable write login_key: %v", err) } @@ -414,13 +416,20 @@ func (dml *DemoMatchLoader) handleDemo(demo *Demo, apiKey string, rl *rate.Limit log.Infof("[DL] Recieved matchdetails for match %d (%s)", matchId, time.Since(t)) + // init tx + tx, err := dml.db.Tx(context.Background()) + if err != nil { + return errors.Wrap(err, "error creating transaction") + } + matchZero := matchDetails.GetMatches()[0] lastRound := matchZero.GetRoundstatsall()[len(matchZero.Roundstatsall)-1] var players []*ent.Player for _, accountId := range lastRound.GetReservation().GetAccountIds() { - tPlayer, err := utils.Player(dml.db, AccountId2SteamId(accountId), apiKey, rl) + tPlayer, err := utils.Player(tx.Client(), AccountId2SteamId(accountId), apiKey, rl) if err != nil { + err = utils.Rollback(tx, err) return fmt.Errorf("error getting player for steamid %d: %w", AccountId2SteamId(accountId), err) } players = append(players, tPlayer) @@ -430,7 +439,7 @@ func (dml *DemoMatchLoader) handleDemo(demo *Demo, apiKey string, rl *rate.Limit demo.MatchId = matchZero.GetMatchid() demo.DecryptionKey = []byte(strings.ToUpper(fmt.Sprintf("%016x", matchZero.GetWatchablematchinfo().GetClDecryptdataKeyPub()))) - tMatch, err := dml.db.Match.Create(). + tMatch, err := tx.Match.Create(). SetID(matchZero.GetMatchid()). AddPlayers(players...). SetDate(time.Unix(int64(matchZero.GetMatchtime()), 0).UTC()). @@ -444,6 +453,7 @@ func (dml *DemoMatchLoader) handleDemo(demo *Demo, apiKey string, rl *rate.Limit SetDecryptionKey(demo.DecryptionKey). Save(context.Background()) if err != nil { + err = utils.Rollback(tx, err) return fmt.Errorf("error creating match %d: %w", matchZero.GetMatchid(), err) } @@ -463,9 +473,7 @@ func (dml *DemoMatchLoader) handleDemo(demo *Demo, apiKey string, rl *rate.Limit for _, round := range matchZero.GetRoundstatsall() { kills, _, _, _, _, _ := playerStatsFromRound(round, mPlayer) - killDiff := kills - oldKills - - switch killDiff { + switch kills - oldKills { case 2: mk2++ case 3: @@ -479,7 +487,7 @@ func (dml *DemoMatchLoader) handleDemo(demo *Demo, apiKey string, rl *rate.Limit } kills, deaths, assists, hs, score, mvp := playerStatsFromRound(lastRound, mPlayer) - err := dml.db.MatchPlayer.Create(). + err = tx.MatchPlayer.Create(). SetMatches(tMatch). SetPlayers(mPlayer). SetTeamID(teamId). @@ -495,6 +503,7 @@ func (dml *DemoMatchLoader) handleDemo(demo *Demo, apiKey string, rl *rate.Limit SetMk5(mk5). Exec(context.Background()) if err != nil { + err = utils.Rollback(tx, err) return fmt.Errorf("error creating stats for player %d in match %d: %w", mPlayer.ID, tMatch.ID, err) } } diff --git a/csgo/demo_parser.go b/csgo/demo_parser.go index 4a4eee4..5d83827 100644 --- a/csgo/demo_parser.go +++ b/csgo/demo_parser.go @@ -5,11 +5,13 @@ import ( "compress/bzip2" "context" "encoding/gob" + "errors" "fmt" "git.harting.dev/csgowtf/csgowtfd/ent" "git.harting.dev/csgowtf/csgowtfd/ent/match" "git.harting.dev/csgowtf/csgowtfd/ent/matchplayer" "git.harting.dev/csgowtf/csgowtfd/ent/player" + "git.harting.dev/csgowtf/csgowtfd/utils" "github.com/golang/geo/r2" "github.com/markus-wa/demoinfocs-golang/v3/pkg/demoinfocs" "github.com/markus-wa/demoinfocs-golang/v3/pkg/demoinfocs/common" @@ -173,7 +175,8 @@ func setMatchPlayerColor(matchPlayer *ent.MatchPlayer, demoPlayer *common.Player } matchPlayer.Crosshair = demoPlayer.CrosshairCode() - switch demoPlayer.Color() { + color, _ := demoPlayer.ColorOrErr() + switch color { case common.Yellow: matchPlayer.Color = matchplayer.ColorYellow break @@ -189,10 +192,13 @@ func setMatchPlayerColor(matchPlayer *ent.MatchPlayer, demoPlayer *common.Player case common.Orange: matchPlayer.Color = matchplayer.ColorOrange break + case common.Grey: + matchPlayer.Color = matchplayer.ColorGrey } } func (dp *DemoParser) parseWorker() { +workloop: for demo := range dp.demoQueue { if demo.MatchId == 0 { log.Warningf("[DP] can't parse match %s: no matchid found", demo.ShareCode) @@ -200,14 +206,24 @@ func (dp *DemoParser) parseWorker() { continue } - tMatch, err := dp.db.Match.Get(context.Background(), demo.MatchId) + // init tx + tx, err := dp.db.Tx(context.Background()) if err != nil { + log.Errorf("[DP] error creating transaction: %v", err) + dp.Done <- demo + return + } + + tMatch, err := tx.Match.Get(context.Background(), demo.MatchId) + if err != nil { + err = utils.Rollback(tx, err) log.Errorf("[DP] Unable to get match %d: %v", demo.MatchId, err) dp.Done <- demo continue } if tMatch.DemoParsed { + err = utils.Rollback(tx, err) log.Infof("[DP] skipped already parsed %d", demo.MatchId) dp.Done <- demo continue @@ -216,15 +232,17 @@ func (dp *DemoParser) parseWorker() { startTime := time.Now() fDemo, err := demo.download() if err != nil { - if _, ok := err.(DemoNotFoundError); ok { + if errors.Is(err, DemoNotFoundError{}) { + err = utils.Rollback(tx, err) if tMatch.Date.Before(time.Now().UTC().AddDate(0, 0, -30)) { log.Infof("[DP] demo expired for match %d", tMatch.ID) } else { - log.Infof("[DP] demo 404 not found for match %d. Trying again later.", demo.MatchId) + log.Infof("[DP] demo 404 not found for match %d. Retrying later.", demo.MatchId) } dp.Done <- demo continue } else { + err = utils.Rollback(tx, err) log.Errorf("[DP] Unable to download demo for %d: %v", demo.MatchId, err) dp.Done <- demo continue @@ -234,6 +252,7 @@ func (dp *DemoParser) parseWorker() { tStats, err := tMatch.QueryStats().WithPlayers().All(context.Background()) if err != nil { + err = utils.Rollback(tx, err) log.Errorf("[DP] Failed to find players for match %d: %v", demo.MatchId, err) dp.Done <- demo continue @@ -263,7 +282,6 @@ func (dp *DemoParser) parseWorker() { // onChatMessage demoParser.RegisterEventHandler(func(e events.ChatMessage) { if e.Sender != nil { - gs := demoParser.GameState() tAttacker, err := dp.MatchPlayerBySteamID(tStats, e.Sender.SteamID64) if err != nil { @@ -453,6 +471,7 @@ func (dp *DemoParser) parseWorker() { err = demoParser.ParseToEnd() if err != nil { + err = utils.Rollback(tx, err) log.Errorf("[DP] Error parsing replay: %v", err) dp.Done <- demo continue @@ -464,6 +483,7 @@ func (dp *DemoParser) parseWorker() { SetTickRate(demoParser.TickRate()). Exec(context.Background()) if err != nil { + err = utils.Rollback(tx, err) log.Errorf("[DP] Unable to update match %d in database: %v", demo.MatchId, err) dp.Done <- demo continue @@ -495,21 +515,29 @@ func (dp *DemoParser) parseWorker() { SetDmgEnemy(tMatchPlayer.DmgEnemy). Save(context.Background()) if err != nil { + err = utils.Rollback(tx, err) log.Errorf("[DP] Unable to update stats %d in database: %v", tMatchPlayer.PlayerStats, err) - continue + dp.Done <- demo + continue workloop } for _, eqDmg := range eqMap[tMatchPlayer.PlayerStats] { - err := dp.db.Weapon.Create().SetStat(nMatchPLayer).SetDmg(eqDmg.Dmg).SetVictim(eqDmg.To).SetHitGroup(eqDmg.HitGroup).SetEqType(eqDmg.Eq).Exec(context.Background()) + err = tx.Weapon.Create().SetStat(nMatchPLayer).SetDmg(eqDmg.Dmg).SetVictim(eqDmg.To).SetHitGroup(eqDmg.HitGroup).SetEqType(eqDmg.Eq).Exec(context.Background()) if err != nil { + err = utils.Rollback(tx, err) log.Errorf("[DP] Unable to create WeaponStat: %v", err) + dp.Done <- demo + continue workloop } } for _, eco := range ecoMap[tMatchPlayer.PlayerStats] { - err := dp.db.RoundStats.Create().SetMatchPlayer(nMatchPLayer).SetRound(uint(eco.Round)).SetBank(uint(eco.Bank)).SetEquipment(uint(eco.EqV)).SetSpent(uint(eco.Spent)).Exec(context.Background()) + err := tx.RoundStats.Create().SetMatchPlayer(nMatchPLayer).SetRound(uint(eco.Round)).SetBank(uint(eco.Bank)).SetEquipment(uint(eco.EqV)).SetSpent(uint(eco.Spent)).Exec(context.Background()) if err != nil { + err = utils.Rollback(tx, err) log.Errorf("[DP] Unable to create RoundStat: %v", err) + dp.Done <- demo + continue workloop } } @@ -520,30 +548,45 @@ func (dp *DemoParser) parseWorker() { enc := gob.NewEncoder(sprayBuf) err = enc.Encode(sprayAvg) if err != nil { + err = utils.Rollback(tx, err) log.Warningf("[DP] Failure to encode spray %v as bytes: %v", spray, err) - continue + dp.Done <- demo + continue workloop } - err = dp.db.Spray.Create().SetMatchPlayers(nMatchPLayer).SetWeapon(spray.Weapon).SetSpray(sprayBuf.Bytes()).Exec(context.Background()) + err = tx.Spray.Create().SetMatchPlayers(nMatchPLayer).SetWeapon(spray.Weapon).SetSpray(sprayBuf.Bytes()).Exec(context.Background()) if err != nil { + err = utils.Rollback(tx, err) log.Warningf("[DP] Failure adding spray to database: %v", err) + dp.Done <- demo + continue workloop } } } } - bulk := make([]*ent.MessagesCreate, 0) + var bulk []*ent.MessagesCreate for _, msg := range tMatchPlayer.Edges.Messages { - bulk = append(bulk, dp.db.Messages.Create().SetMessage(msg.Message).SetAllChat(msg.AllChat).SetTick(msg.Tick).SetMatchPlayer(tMatchPlayer)) + bulk = append(bulk, tx.Messages.Create().SetMessage(msg.Message).SetAllChat(msg.AllChat).SetTick(msg.Tick).SetMatchPlayer(tMatchPlayer)) } if len(bulk) > 0 { - err = dp.db.Messages.CreateBulk(bulk...).Exec(context.Background()) + err = tx.Messages.CreateBulk(bulk...).Exec(context.Background()) if err != nil { - log.Warningf("[DP] Failure adding messages to database: %v", err) + err = utils.Rollback(tx, err) + log.Warningf("[DP] Failure adding messages to db: %v", err) + dp.Done <- demo + continue workloop } } } + err = tx.Commit() + if err != nil { + log.Errorf("[DP] error commting to db: %v", err) + dp.Done <- demo + continue + } + log.Infof("[DP] parsed match %d (took %s/%s)", demo.MatchId, downloadTime, time.Since(startTime)) err = demoParser.Close() diff --git a/utils/utils.go b/utils/utils.go index 73d5f0b..cccba82 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -17,7 +17,6 @@ import ( log "github.com/sirupsen/logrus" "golang.org/x/time/rate" "io" - "io/ioutil" "math" "net/http" "net/url" @@ -578,7 +577,7 @@ func getNextShareCode(lastCode string, apiKey string, authCode string, steamId u defer func(Body io.ReadCloser) { _ = Body.Close() }(r.Body) - bJson, err := ioutil.ReadAll(r.Body) + bJson, err := io.ReadAll(r.Body) if err != nil { return "", err } @@ -823,3 +822,10 @@ func RealIP(header *http.Header, fallback string) string { return fallback } } + +func Rollback(tx *ent.Tx, err error) error { + if rErr := tx.Rollback(); rErr != nil { + err = fmt.Errorf("%w: %v", err, rErr) + } + return err +}