switched to transactional queries for demo loading & parsing

This commit is contained in:
2022-11-03 03:18:24 +01:00
parent ff3bbe0037
commit abd8b74c08
3 changed files with 109 additions and 51 deletions

View File

@@ -11,11 +11,11 @@ import (
"github.com/an0nfunc/go-steam/v3/protocol/gamecoordinator"
"github.com/an0nfunc/go-steam/v3/protocol/steamlang"
"github.com/go-redis/cache/v8"
"github.com/pkg/errors"
"github.com/sethvargo/go-retry"
log "github.com/sirupsen/logrus"
"golang.org/x/time/rate"
"google.golang.org/protobuf/proto"
"io/ioutil"
"os"
"strings"
"sync"
@@ -44,22 +44,23 @@ type DemoMatchLoaderConfig struct {
}
type DemoMatchLoader struct {
client *steam.Client
GCReady bool
steamLogin *steam.LogOnDetails
matchRecv chan *protobuf.CMsgGCCStrike15V2_MatchList
cmList []*netutil.PortAddr
sentryFile string
loginKey string
db *ent.Client
dp *DemoParser
parseDemo chan *Demo
parseMap map[string]bool
parseMapL *sync.RWMutex
cache *cache.Cache
connectionWait retry.Backoff
connectFeedback chan int
LoggedIn bool
client *steam.Client
GCReady bool
steamLogin *steam.LogOnDetails
matchRecv chan *protobuf.CMsgGCCStrike15V2_MatchList
cmList []*netutil.PortAddr
sentryFile string
loginKey string
db *ent.Client
dp *DemoParser
parseDemo chan *Demo
parseMap map[string]bool
parseMapL *sync.RWMutex
cache *cache.Cache
connectionWait retry.Backoff
connectionWaitTmpl retry.Backoff
connectFeedback chan int
LoggedIn bool
}
func AccountId2SteamId(accId uint32) uint64 {
@@ -115,7 +116,7 @@ func (dml *DemoMatchLoader) HandleGCPacket(pkg *gamecoordinator.GCPacket) {
msg := &protobuf.CMsgConnectionStatus{}
err := proto.Unmarshal(pkg.Body, msg)
if err != nil {
log.Errorf("[DL] Unable to unmarshal event %v: %v", pkg.MsgType, err)
log.Errorf("[GC] Unable to unmarshal event %v: %v", pkg.MsgType, err)
}
log.Debugf("[GC] Status: %+v", msg)
@@ -127,7 +128,7 @@ func (dml *DemoMatchLoader) HandleGCPacket(pkg *gamecoordinator.GCPacket) {
msg := &protobuf.GlobalStatistics{}
err := proto.Unmarshal(pkg.Body, msg)
if err != nil {
log.Errorf("[DL] Unable to unmarshal event %v: %v", pkg.MsgType, err)
log.Errorf("[GC] Unable to unmarshal event %v: %v", pkg.MsgType, err)
}
log.Debugf("[GC] Stats: %+v", msg)
dml.GCReady = true
@@ -135,11 +136,11 @@ func (dml *DemoMatchLoader) HandleGCPacket(pkg *gamecoordinator.GCPacket) {
msg := &protobuf.CMsgGCCStrike15V2_MatchList{}
err := proto.Unmarshal(pkg.Body, msg)
if err != nil {
log.Errorf("[DL] Unable to unmarshal event %v: %v", pkg.MsgType, err)
log.Errorf("[GC] Unable to unmarshal event %v: %v", pkg.MsgType, err)
}
dml.matchRecv <- msg
default:
log.Debugf("[GC] Unhandled GC message: %+v", pkg)
log.Debugf("[GC] Unhandled message: %+v", pkg)
}
}
@@ -190,8 +191,7 @@ func (dml *DemoMatchLoader) Setup(config *DemoMatchLoaderConfig) error {
dml.parseMapL = new(sync.RWMutex)
dml.cache = config.Cache
dml.connectFeedback = make(chan int, 10)
dml.connectionWait = retry.NewExponential(time.Second)
dml.connectionWait = retry.WithCappedDuration(time.Minute*time.Duration(config.RetryTimeout), dml.connectionWait)
dml.connectionWait = retry.WithCappedDuration(time.Minute*time.Duration(config.RetryTimeout), retry.NewExponential(time.Minute))
err := dml.dp.Setup(config.Db, config.Worker, config.SprayTimeout)
if err != nil {
return err
@@ -204,7 +204,7 @@ func (dml *DemoMatchLoader) Setup(config *DemoMatchLoaderConfig) error {
dml.steamLogin.ShouldRememberPassword = true
if _, err := os.Stat(dml.sentryFile); err == nil {
hash, err := ioutil.ReadFile(dml.sentryFile)
hash, err := os.ReadFile(dml.sentryFile)
if err != nil {
return err
}
@@ -212,7 +212,7 @@ func (dml *DemoMatchLoader) Setup(config *DemoMatchLoaderConfig) error {
}
if _, err := os.Stat(dml.loginKey); err == nil {
hash, err := ioutil.ReadFile(dml.loginKey)
hash, err := os.ReadFile(dml.loginKey)
if err != nil {
return err
}
@@ -264,6 +264,8 @@ func (dml *DemoMatchLoader) connectLoop() {
case LoginFailed:
if sleep, ok := dml.connectionWait.Next(); ok {
time.Sleep(sleep)
} else {
panic("retry should never be stop")
}
if !dml.LoggedIn {
log.Infof("[DL] Connecting to steam...")
@@ -274,8 +276,8 @@ func (dml *DemoMatchLoader) connectLoop() {
}
}
case LoginSuccess:
log.Infof("[DL] Steam login successfully restored after %d minutes", dml.connectionWait)
dml.connectionWait = retry.NewExponential(time.Minute)
log.Info("[DL] Steam login successfully restored")
dml.connectionWait = dml.connectionWaitTmpl
}
}
}
@@ -289,7 +291,7 @@ func (dml *DemoMatchLoader) steamEventHandler() {
dml.client.Auth.LogOn(dml.steamLogin)
case *steam.MachineAuthUpdateEvent:
log.Debug("[DL] Got sentry!")
err := ioutil.WriteFile(dml.sentryFile, e.Hash, os.ModePerm)
err := os.WriteFile(dml.sentryFile, e.Hash, os.ModePerm)
if err != nil {
log.Errorf("[DL] Unable write sentry file: %v", err)
}
@@ -320,7 +322,7 @@ func (dml *DemoMatchLoader) steamEventHandler() {
dml.connectFeedback <- LoginFailed
case *steam.LoginKeyEvent:
log.Debug("Got login_key!")
err := ioutil.WriteFile(dml.loginKey, []byte(e.LoginKey), os.ModePerm)
err := os.WriteFile(dml.loginKey, []byte(e.LoginKey), os.ModePerm)
if err != nil {
log.Errorf("[DL] Unable write login_key: %v", err)
}
@@ -414,13 +416,20 @@ func (dml *DemoMatchLoader) handleDemo(demo *Demo, apiKey string, rl *rate.Limit
log.Infof("[DL] Recieved matchdetails for match %d (%s)", matchId, time.Since(t))
// init tx
tx, err := dml.db.Tx(context.Background())
if err != nil {
return errors.Wrap(err, "error creating transaction")
}
matchZero := matchDetails.GetMatches()[0]
lastRound := matchZero.GetRoundstatsall()[len(matchZero.Roundstatsall)-1]
var players []*ent.Player
for _, accountId := range lastRound.GetReservation().GetAccountIds() {
tPlayer, err := utils.Player(dml.db, AccountId2SteamId(accountId), apiKey, rl)
tPlayer, err := utils.Player(tx.Client(), AccountId2SteamId(accountId), apiKey, rl)
if err != nil {
err = utils.Rollback(tx, err)
return fmt.Errorf("error getting player for steamid %d: %w", AccountId2SteamId(accountId), err)
}
players = append(players, tPlayer)
@@ -430,7 +439,7 @@ func (dml *DemoMatchLoader) handleDemo(demo *Demo, apiKey string, rl *rate.Limit
demo.MatchId = matchZero.GetMatchid()
demo.DecryptionKey = []byte(strings.ToUpper(fmt.Sprintf("%016x", matchZero.GetWatchablematchinfo().GetClDecryptdataKeyPub())))
tMatch, err := dml.db.Match.Create().
tMatch, err := tx.Match.Create().
SetID(matchZero.GetMatchid()).
AddPlayers(players...).
SetDate(time.Unix(int64(matchZero.GetMatchtime()), 0).UTC()).
@@ -444,6 +453,7 @@ func (dml *DemoMatchLoader) handleDemo(demo *Demo, apiKey string, rl *rate.Limit
SetDecryptionKey(demo.DecryptionKey).
Save(context.Background())
if err != nil {
err = utils.Rollback(tx, err)
return fmt.Errorf("error creating match %d: %w", matchZero.GetMatchid(), err)
}
@@ -463,9 +473,7 @@ func (dml *DemoMatchLoader) handleDemo(demo *Demo, apiKey string, rl *rate.Limit
for _, round := range matchZero.GetRoundstatsall() {
kills, _, _, _, _, _ := playerStatsFromRound(round, mPlayer)
killDiff := kills - oldKills
switch killDiff {
switch kills - oldKills {
case 2:
mk2++
case 3:
@@ -479,7 +487,7 @@ func (dml *DemoMatchLoader) handleDemo(demo *Demo, apiKey string, rl *rate.Limit
}
kills, deaths, assists, hs, score, mvp := playerStatsFromRound(lastRound, mPlayer)
err := dml.db.MatchPlayer.Create().
err = tx.MatchPlayer.Create().
SetMatches(tMatch).
SetPlayers(mPlayer).
SetTeamID(teamId).
@@ -495,6 +503,7 @@ func (dml *DemoMatchLoader) handleDemo(demo *Demo, apiKey string, rl *rate.Limit
SetMk5(mk5).
Exec(context.Background())
if err != nil {
err = utils.Rollback(tx, err)
return fmt.Errorf("error creating stats for player %d in match %d: %w", mPlayer.ID, tMatch.ID, err)
}
}

View File

@@ -5,11 +5,13 @@ import (
"compress/bzip2"
"context"
"encoding/gob"
"errors"
"fmt"
"git.harting.dev/csgowtf/csgowtfd/ent"
"git.harting.dev/csgowtf/csgowtfd/ent/match"
"git.harting.dev/csgowtf/csgowtfd/ent/matchplayer"
"git.harting.dev/csgowtf/csgowtfd/ent/player"
"git.harting.dev/csgowtf/csgowtfd/utils"
"github.com/golang/geo/r2"
"github.com/markus-wa/demoinfocs-golang/v3/pkg/demoinfocs"
"github.com/markus-wa/demoinfocs-golang/v3/pkg/demoinfocs/common"
@@ -173,7 +175,8 @@ func setMatchPlayerColor(matchPlayer *ent.MatchPlayer, demoPlayer *common.Player
}
matchPlayer.Crosshair = demoPlayer.CrosshairCode()
switch demoPlayer.Color() {
color, _ := demoPlayer.ColorOrErr()
switch color {
case common.Yellow:
matchPlayer.Color = matchplayer.ColorYellow
break
@@ -189,10 +192,13 @@ func setMatchPlayerColor(matchPlayer *ent.MatchPlayer, demoPlayer *common.Player
case common.Orange:
matchPlayer.Color = matchplayer.ColorOrange
break
case common.Grey:
matchPlayer.Color = matchplayer.ColorGrey
}
}
func (dp *DemoParser) parseWorker() {
workloop:
for demo := range dp.demoQueue {
if demo.MatchId == 0 {
log.Warningf("[DP] can't parse match %s: no matchid found", demo.ShareCode)
@@ -200,14 +206,24 @@ func (dp *DemoParser) parseWorker() {
continue
}
tMatch, err := dp.db.Match.Get(context.Background(), demo.MatchId)
// init tx
tx, err := dp.db.Tx(context.Background())
if err != nil {
log.Errorf("[DP] error creating transaction: %v", err)
dp.Done <- demo
return
}
tMatch, err := tx.Match.Get(context.Background(), demo.MatchId)
if err != nil {
err = utils.Rollback(tx, err)
log.Errorf("[DP] Unable to get match %d: %v", demo.MatchId, err)
dp.Done <- demo
continue
}
if tMatch.DemoParsed {
err = utils.Rollback(tx, err)
log.Infof("[DP] skipped already parsed %d", demo.MatchId)
dp.Done <- demo
continue
@@ -216,15 +232,17 @@ func (dp *DemoParser) parseWorker() {
startTime := time.Now()
fDemo, err := demo.download()
if err != nil {
if _, ok := err.(DemoNotFoundError); ok {
if errors.Is(err, DemoNotFoundError{}) {
err = utils.Rollback(tx, err)
if tMatch.Date.Before(time.Now().UTC().AddDate(0, 0, -30)) {
log.Infof("[DP] demo expired for match %d", tMatch.ID)
} else {
log.Infof("[DP] demo 404 not found for match %d. Trying again later.", demo.MatchId)
log.Infof("[DP] demo 404 not found for match %d. Retrying later.", demo.MatchId)
}
dp.Done <- demo
continue
} else {
err = utils.Rollback(tx, err)
log.Errorf("[DP] Unable to download demo for %d: %v", demo.MatchId, err)
dp.Done <- demo
continue
@@ -234,6 +252,7 @@ func (dp *DemoParser) parseWorker() {
tStats, err := tMatch.QueryStats().WithPlayers().All(context.Background())
if err != nil {
err = utils.Rollback(tx, err)
log.Errorf("[DP] Failed to find players for match %d: %v", demo.MatchId, err)
dp.Done <- demo
continue
@@ -263,7 +282,6 @@ func (dp *DemoParser) parseWorker() {
// onChatMessage
demoParser.RegisterEventHandler(func(e events.ChatMessage) {
if e.Sender != nil {
gs := demoParser.GameState()
tAttacker, err := dp.MatchPlayerBySteamID(tStats, e.Sender.SteamID64)
if err != nil {
@@ -453,6 +471,7 @@ func (dp *DemoParser) parseWorker() {
err = demoParser.ParseToEnd()
if err != nil {
err = utils.Rollback(tx, err)
log.Errorf("[DP] Error parsing replay: %v", err)
dp.Done <- demo
continue
@@ -464,6 +483,7 @@ func (dp *DemoParser) parseWorker() {
SetTickRate(demoParser.TickRate()).
Exec(context.Background())
if err != nil {
err = utils.Rollback(tx, err)
log.Errorf("[DP] Unable to update match %d in database: %v", demo.MatchId, err)
dp.Done <- demo
continue
@@ -495,21 +515,29 @@ func (dp *DemoParser) parseWorker() {
SetDmgEnemy(tMatchPlayer.DmgEnemy).
Save(context.Background())
if err != nil {
err = utils.Rollback(tx, err)
log.Errorf("[DP] Unable to update stats %d in database: %v", tMatchPlayer.PlayerStats, err)
continue
dp.Done <- demo
continue workloop
}
for _, eqDmg := range eqMap[tMatchPlayer.PlayerStats] {
err := dp.db.Weapon.Create().SetStat(nMatchPLayer).SetDmg(eqDmg.Dmg).SetVictim(eqDmg.To).SetHitGroup(eqDmg.HitGroup).SetEqType(eqDmg.Eq).Exec(context.Background())
err = tx.Weapon.Create().SetStat(nMatchPLayer).SetDmg(eqDmg.Dmg).SetVictim(eqDmg.To).SetHitGroup(eqDmg.HitGroup).SetEqType(eqDmg.Eq).Exec(context.Background())
if err != nil {
err = utils.Rollback(tx, err)
log.Errorf("[DP] Unable to create WeaponStat: %v", err)
dp.Done <- demo
continue workloop
}
}
for _, eco := range ecoMap[tMatchPlayer.PlayerStats] {
err := dp.db.RoundStats.Create().SetMatchPlayer(nMatchPLayer).SetRound(uint(eco.Round)).SetBank(uint(eco.Bank)).SetEquipment(uint(eco.EqV)).SetSpent(uint(eco.Spent)).Exec(context.Background())
err := tx.RoundStats.Create().SetMatchPlayer(nMatchPLayer).SetRound(uint(eco.Round)).SetBank(uint(eco.Bank)).SetEquipment(uint(eco.EqV)).SetSpent(uint(eco.Spent)).Exec(context.Background())
if err != nil {
err = utils.Rollback(tx, err)
log.Errorf("[DP] Unable to create RoundStat: %v", err)
dp.Done <- demo
continue workloop
}
}
@@ -520,30 +548,45 @@ func (dp *DemoParser) parseWorker() {
enc := gob.NewEncoder(sprayBuf)
err = enc.Encode(sprayAvg)
if err != nil {
err = utils.Rollback(tx, err)
log.Warningf("[DP] Failure to encode spray %v as bytes: %v", spray, err)
continue
dp.Done <- demo
continue workloop
}
err = dp.db.Spray.Create().SetMatchPlayers(nMatchPLayer).SetWeapon(spray.Weapon).SetSpray(sprayBuf.Bytes()).Exec(context.Background())
err = tx.Spray.Create().SetMatchPlayers(nMatchPLayer).SetWeapon(spray.Weapon).SetSpray(sprayBuf.Bytes()).Exec(context.Background())
if err != nil {
err = utils.Rollback(tx, err)
log.Warningf("[DP] Failure adding spray to database: %v", err)
dp.Done <- demo
continue workloop
}
}
}
}
bulk := make([]*ent.MessagesCreate, 0)
var bulk []*ent.MessagesCreate
for _, msg := range tMatchPlayer.Edges.Messages {
bulk = append(bulk, dp.db.Messages.Create().SetMessage(msg.Message).SetAllChat(msg.AllChat).SetTick(msg.Tick).SetMatchPlayer(tMatchPlayer))
bulk = append(bulk, tx.Messages.Create().SetMessage(msg.Message).SetAllChat(msg.AllChat).SetTick(msg.Tick).SetMatchPlayer(tMatchPlayer))
}
if len(bulk) > 0 {
err = dp.db.Messages.CreateBulk(bulk...).Exec(context.Background())
err = tx.Messages.CreateBulk(bulk...).Exec(context.Background())
if err != nil {
log.Warningf("[DP] Failure adding messages to database: %v", err)
err = utils.Rollback(tx, err)
log.Warningf("[DP] Failure adding messages to db: %v", err)
dp.Done <- demo
continue workloop
}
}
}
err = tx.Commit()
if err != nil {
log.Errorf("[DP] error commting to db: %v", err)
dp.Done <- demo
continue
}
log.Infof("[DP] parsed match %d (took %s/%s)", demo.MatchId, downloadTime, time.Since(startTime))
err = demoParser.Close()