reworked preventing parsing one match multiple times

This commit is contained in:
2022-02-04 05:43:29 +01:00
parent 6a4e2f3ee3
commit 466a000c86
3 changed files with 161 additions and 144 deletions

View File

@@ -51,8 +51,8 @@ type DemoMatchLoader struct {
db *ent.Client db *ent.Client
dp *DemoParser dp *DemoParser
parseDemo chan *Demo parseDemo chan *Demo
ParseMap map[string]bool parseMap map[string]bool
ParseMapL *sync.Mutex parseMapL *sync.RWMutex
cache *cache.Cache cache *cache.Cache
connecting bool connecting bool
} }
@@ -75,7 +75,28 @@ func playerStatsFromRound(round *protobuf.CMsgGCCStrike15V2_MatchmakingServerRou
return 0, 0, 0, 0, 0, 0 return 0, 0, 0, 0, 0, 0
} }
func (d *DemoMatchLoader) HandleGCPacket(pkg *gamecoordinator.GCPacket) { func (dml *DemoMatchLoader) IsLoading(demo *Demo) bool {
dml.parseMapL.RLock()
defer dml.parseMapL.RUnlock()
if _, ok := dml.parseMap[demo.ShareCode]; ok {
return true
}
return false
}
func (dml *DemoMatchLoader) unlockDemo(demo *Demo) {
dml.parseMapL.Lock()
defer dml.parseMapL.Unlock()
delete(dml.parseMap, demo.ShareCode)
}
func (dml *DemoMatchLoader) lockDemo(demo *Demo) {
dml.parseMapL.Lock()
defer dml.parseMapL.Unlock()
dml.parseMap[demo.ShareCode] = true
}
func (dml *DemoMatchLoader) HandleGCPacket(pkg *gamecoordinator.GCPacket) {
switch pkg.MsgType { switch pkg.MsgType {
case uint32(protobuf.EGCBaseClientMsg_k_EMsgGCClientWelcome): case uint32(protobuf.EGCBaseClientMsg_k_EMsgGCClientWelcome):
msg := &protobuf.CMsgClientWelcome{} msg := &protobuf.CMsgClientWelcome{}
@@ -84,7 +105,7 @@ func (d *DemoMatchLoader) HandleGCPacket(pkg *gamecoordinator.GCPacket) {
log.Errorf("[DL] Unable to unmarshal event %v: %v", pkg.MsgType, err) log.Errorf("[DL] Unable to unmarshal event %v: %v", pkg.MsgType, err)
} }
log.Debugf("[GC] Welcome: %+v", msg) log.Debugf("[GC] Welcome: %+v", msg)
d.GCReady = true dml.GCReady = true
case uint32(protobuf.EGCBaseClientMsg_k_EMsgGCClientConnectionStatus): case uint32(protobuf.EGCBaseClientMsg_k_EMsgGCClientConnectionStatus):
msg := &protobuf.CMsgConnectionStatus{} msg := &protobuf.CMsgConnectionStatus{}
err := proto.Unmarshal(pkg.Body, msg) err := proto.Unmarshal(pkg.Body, msg)
@@ -94,8 +115,8 @@ func (d *DemoMatchLoader) HandleGCPacket(pkg *gamecoordinator.GCPacket) {
log.Debugf("[GC] Status: %+v", msg) log.Debugf("[GC] Status: %+v", msg)
if msg.GetStatus() != protobuf.GCConnectionStatus_GCConnectionStatus_HAVE_SESSION { if msg.GetStatus() != protobuf.GCConnectionStatus_GCConnectionStatus_HAVE_SESSION {
d.GCReady = false dml.GCReady = false
go d.greetGC() go dml.greetGC()
} }
case uint32(protobuf.ECsgoGCMsg_k_EMsgGCCStrike15_v2_GC2ClientGlobalStats): case uint32(protobuf.ECsgoGCMsg_k_EMsgGCCStrike15_v2_GC2ClientGlobalStats):
msg := &protobuf.GlobalStatistics{} msg := &protobuf.GlobalStatistics{}
@@ -104,21 +125,21 @@ func (d *DemoMatchLoader) HandleGCPacket(pkg *gamecoordinator.GCPacket) {
log.Errorf("[DL] Unable to unmarshal event %v: %v", pkg.MsgType, err) log.Errorf("[DL] Unable to unmarshal event %v: %v", pkg.MsgType, err)
} }
log.Debugf("[GC] Stats: %+v", msg) log.Debugf("[GC] Stats: %+v", msg)
d.GCReady = true dml.GCReady = true
case uint32(protobuf.ECsgoGCMsg_k_EMsgGCCStrike15_v2_MatchList): case uint32(protobuf.ECsgoGCMsg_k_EMsgGCCStrike15_v2_MatchList):
msg := &protobuf.CMsgGCCStrike15V2_MatchList{} msg := &protobuf.CMsgGCCStrike15V2_MatchList{}
err := proto.Unmarshal(pkg.Body, msg) err := proto.Unmarshal(pkg.Body, msg)
if err != nil { if err != nil {
log.Errorf("[DL] Unable to unmarshal event %v: %v", pkg.MsgType, err) log.Errorf("[DL] Unable to unmarshal event %v: %v", pkg.MsgType, err)
} }
d.matchRecv <- msg dml.matchRecv <- msg
default: default:
log.Debugf("[GC] Unhandled GC message: %+v", pkg) log.Debugf("[GC] Unhandled GC message: %+v", pkg)
} }
} }
func (d *DemoMatchLoader) getMatchDetails(sharecode string) (*protobuf.CMsgGCCStrike15V2_MatchList, error) { func (dml *DemoMatchLoader) getMatchDetails(sharecode string) (*protobuf.CMsgGCCStrike15V2_MatchList, error) {
if !d.GCReady { if !dml.GCReady {
return nil, fmt.Errorf("gc not ready") return nil, fmt.Errorf("gc not ready")
} }
@@ -126,142 +147,143 @@ func (d *DemoMatchLoader) getMatchDetails(sharecode string) (*protobuf.CMsgGCCSt
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = d.requestDemoInfo(matchId, outcomeId, uint32(tokenId)) err = dml.requestDemoInfo(matchId, outcomeId, uint32(tokenId))
if err != nil { if err != nil {
return nil, err return nil, err
} }
for { for {
select { select {
case matchDetails := <-d.matchRecv: case matchDetails := <-dml.matchRecv:
if *matchDetails.Matches[0].Matchid == matchId { if *matchDetails.Matches[0].Matchid == matchId {
return matchDetails, nil return matchDetails, nil
} else { } else {
d.matchRecv <- matchDetails dml.matchRecv <- matchDetails
} }
} }
} }
} }
func (d *DemoMatchLoader) connectToSteam() error { func (dml *DemoMatchLoader) connectToSteam() error {
if d.client.Connected() { if dml.client.Connected() {
return nil return nil
} }
_, err := d.client.Connect() _, err := dml.client.Connect()
if err != nil { if err != nil {
return err return err
} }
return nil return nil
} }
func (d *DemoMatchLoader) Setup(config *DemoMatchLoaderConfig) error { func (dml *DemoMatchLoader) Setup(config *DemoMatchLoaderConfig) error {
d.loginKey = config.LoginKey dml.loginKey = config.LoginKey
d.sentryFile = config.Sentry dml.sentryFile = config.Sentry
d.db = config.Db dml.db = config.Db
d.dp = &DemoParser{} dml.dp = &DemoParser{}
d.ParseMap = map[string]bool{} dml.parseMap = map[string]bool{}
d.ParseMapL = new(sync.Mutex) dml.parseMapL = new(sync.RWMutex)
d.cache = config.Cache dml.cache = config.Cache
err := d.dp.Setup(config.Db, config.Worker, config.SprayTimeout) err := dml.dp.Setup(config.Db, config.Worker, config.SprayTimeout)
if err != nil { if err != nil {
return err return err
} }
d.steamLogin = new(steam.LogOnDetails) dml.steamLogin = new(steam.LogOnDetails)
d.steamLogin.Username = config.Username dml.steamLogin.Username = config.Username
d.steamLogin.Password = config.Password dml.steamLogin.Password = config.Password
d.steamLogin.AuthCode = config.AuthCode dml.steamLogin.AuthCode = config.AuthCode
d.steamLogin.ShouldRememberPassword = true dml.steamLogin.ShouldRememberPassword = true
if _, err := os.Stat(d.sentryFile); err == nil { if _, err := os.Stat(dml.sentryFile); err == nil {
hash, err := ioutil.ReadFile(d.sentryFile) hash, err := ioutil.ReadFile(dml.sentryFile)
if err != nil { if err != nil {
return err return err
} }
d.steamLogin.SentryFileHash = hash dml.steamLogin.SentryFileHash = hash
} }
if _, err := os.Stat(d.loginKey); err == nil { if _, err := os.Stat(dml.loginKey); err == nil {
hash, err := ioutil.ReadFile(d.loginKey) hash, err := ioutil.ReadFile(dml.loginKey)
if err != nil { if err != nil {
return err return err
} }
d.steamLogin.LoginKey = string(hash) dml.steamLogin.LoginKey = string(hash)
} }
d.client = steam.NewClient() dml.client = steam.NewClient()
err = steam.InitializeSteamDirectory() err = steam.InitializeSteamDirectory()
if err != nil { if err != nil {
return err return err
} }
d.matchRecv = make(chan *protobuf.CMsgGCCStrike15V2_MatchList, 1000) dml.matchRecv = make(chan *protobuf.CMsgGCCStrike15V2_MatchList, 1000)
d.parseDemo = make(chan *Demo, 1000) dml.parseDemo = make(chan *Demo, 1000)
go d.connectLoop() go dml.connectLoop()
go d.steamEventHandler() go dml.steamEventHandler()
go dml.demoWorker()
for i := 0; i < config.Worker; i++ { for i := 0; i < config.Worker; i++ {
go d.gcWorker(config.ApiKey, config.RateLimit) go dml.gcWorker(config.ApiKey, config.RateLimit)
} }
return nil return nil
} }
func (d DemoMatchLoader) LoadDemo(demo *Demo) error { func (dml DemoMatchLoader) LoadDemo(demo *Demo) error {
select { select {
case d.parseDemo <- demo: case dml.parseDemo <- demo:
return nil return nil
default: default:
return fmt.Errorf("queue full") return fmt.Errorf("queue full")
} }
} }
func (d DemoMatchLoader) connectLoop() { func (dml DemoMatchLoader) connectLoop() {
if !d.connecting { if !dml.connecting {
d.connecting = true dml.connecting = true
for d.connectToSteam() != nil { for dml.connectToSteam() != nil {
log.Infof("[DL] Retrying connecting to steam...") log.Infof("[DL] Retrying connecting to steam...")
time.Sleep(time.Minute * 10) time.Sleep(time.Minute * 10)
} }
} }
} }
func (d *DemoMatchLoader) steamEventHandler() { func (dml *DemoMatchLoader) steamEventHandler() {
for event := range d.client.Events() { for event := range dml.client.Events() {
switch e := event.(type) { switch e := event.(type) {
case *steam.ConnectedEvent: case *steam.ConnectedEvent:
log.Debug("[DL] Connected!") log.Debug("[DL] Connected!")
d.client.Auth.LogOn(d.steamLogin) dml.client.Auth.LogOn(dml.steamLogin)
case *steam.MachineAuthUpdateEvent: case *steam.MachineAuthUpdateEvent:
log.Debug("[DL] Got sentry!") log.Debug("[DL] Got sentry!")
err := ioutil.WriteFile(d.sentryFile, e.Hash, os.ModePerm) err := ioutil.WriteFile(dml.sentryFile, e.Hash, os.ModePerm)
if err != nil { if err != nil {
log.Errorf("[DL] Unable write sentry file: %v", err) log.Errorf("[DL] Unable write sentry file: %v", err)
} }
case *steam.LoggedOnEvent: case *steam.LoggedOnEvent:
log.Debug("[DL] Login successfully!") log.Debug("[DL] Login successfully!")
d.client.Social.SetPersonaState(steamlang.EPersonaState_Online) dml.client.Social.SetPersonaState(steamlang.EPersonaState_Online)
go d.setPlaying() go dml.setPlaying()
case *steam.LogOnFailedEvent: case *steam.LogOnFailedEvent:
log.Warningf("[DL] Steam login denied: %+v", e) log.Warningf("[DL] Steam login denied: %+v", e)
switch e.Result { switch e.Result {
case steamlang.EResult_AccountLogonDenied: case steamlang.EResult_AccountLogonDenied:
log.Fatalf("[DL] Please provide AuthCode in config") log.Fatalf("[DL] Please provide AuthCode in config")
case steamlang.EResult_InvalidPassword: case steamlang.EResult_InvalidPassword:
_ = os.Remove(d.sentryFile) _ = os.Remove(dml.sentryFile)
_ = os.Remove(d.loginKey) _ = os.Remove(dml.loginKey)
log.Warningf("[DL] Steam login wrong") log.Warningf("[DL] Steam login wrong")
go d.connectLoop() go dml.connectLoop()
case steamlang.EResult_InvalidLoginAuthCode: case steamlang.EResult_InvalidLoginAuthCode:
log.Fatalf("[DL] Steam auth code wrong") log.Fatalf("[DL] Steam auth code wrong")
} }
case *steam.DisconnectedEvent: case *steam.DisconnectedEvent:
log.Warningf("Steam disconnected, trying to reconnect...") log.Warningf("Steam disconnected, trying to reconnect...")
go d.connectLoop() go dml.connectLoop()
case *steam.LoginKeyEvent: case *steam.LoginKeyEvent:
log.Debug("Got login_key!") log.Debug("Got login_key!")
err := ioutil.WriteFile(d.loginKey, []byte(e.LoginKey), os.ModePerm) err := ioutil.WriteFile(dml.loginKey, []byte(e.LoginKey), os.ModePerm)
if err != nil { if err != nil {
log.Errorf("[DL] Unable write login_key: %v", err) log.Errorf("[DL] Unable write login_key: %v", err)
} }
@@ -275,23 +297,23 @@ func (d *DemoMatchLoader) steamEventHandler() {
} }
} }
func (d *DemoMatchLoader) setPlaying() { func (dml *DemoMatchLoader) setPlaying() {
d.client.GC.SetGamesPlayed(APPID) dml.client.GC.SetGamesPlayed(APPID)
d.client.GC.RegisterPacketHandler(d) dml.client.GC.RegisterPacketHandler(dml)
go d.greetGC() go dml.greetGC()
} }
func (d *DemoMatchLoader) greetGC() { func (dml *DemoMatchLoader) greetGC() {
for !d.GCReady { for !dml.GCReady {
log.Debugf("[DL] Sending GC greeting") log.Debugf("[DL] Sending GC greeting")
msg := protobuf.CMsgClientHello{} msg := protobuf.CMsgClientHello{}
d.client.GC.Write(gamecoordinator.NewGCMsgProtobuf(APPID, uint32(protobuf.EGCBaseClientMsg_k_EMsgGCClientHello), &msg)) dml.client.GC.Write(gamecoordinator.NewGCMsgProtobuf(APPID, uint32(protobuf.EGCBaseClientMsg_k_EMsgGCClientHello), &msg))
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
} }
} }
func (d *DemoMatchLoader) requestDemoInfo(matchId uint64, conclusionId uint64, tokenId uint32) error { func (dml *DemoMatchLoader) requestDemoInfo(matchId uint64, conclusionId uint64, tokenId uint32) error {
if !d.GCReady { if !dml.GCReady {
return fmt.Errorf("gc not ready") return fmt.Errorf("gc not ready")
} }
@@ -299,52 +321,48 @@ func (d *DemoMatchLoader) requestDemoInfo(matchId uint64, conclusionId uint64, t
Outcomeid: &conclusionId, Outcomeid: &conclusionId,
Token: &tokenId} Token: &tokenId}
d.client.GC.Write(gamecoordinator.NewGCMsgProtobuf(APPID, uint32(protobuf.ECsgoGCMsg_k_EMsgGCCStrike15_v2_MatchListRequestFullGameInfo), &msg)) dml.client.GC.Write(gamecoordinator.NewGCMsgProtobuf(APPID, uint32(protobuf.ECsgoGCMsg_k_EMsgGCCStrike15_v2_MatchListRequestFullGameInfo), &msg))
return nil return nil
} }
func (d *DemoMatchLoader) gcWorker(apiKey string, rl ratelimit.Limiter) { func (dml *DemoMatchLoader) demoWorker() {
for demo := range d.parseDemo { for demo := range dml.dp.Done {
d.ParseMapL.Lock() dml.unlockDemo(demo)
if _, ok := d.ParseMap[demo.ShareCode]; ok { }
log.Infof("[DL] Skipping %s: parsing in progress", demo.ShareCode) }
d.ParseMapL.Unlock()
continue
} else {
d.ParseMap[demo.ShareCode] = true
}
d.ParseMapL.Unlock()
if !d.GCReady { func (dml *DemoMatchLoader) gcWorker(apiKey string, rl ratelimit.Limiter) {
for demo := range dml.parseDemo {
if !dml.IsLoading(demo) {
log.Infof("[DL] Skipping %s: parsing in progress", demo.ShareCode)
continue
}
dml.lockDemo(demo)
if !dml.GCReady {
log.Infof("[DL] Postponing match %d (%s): GC not ready", demo.MatchId, demo.ShareCode) log.Infof("[DL] Postponing match %d (%s): GC not ready", demo.MatchId, demo.ShareCode)
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
d.ParseMapL.Lock() dml.unlockDemo(demo)
delete(d.ParseMap, demo.ShareCode) dml.parseDemo <- demo
d.ParseMapL.Unlock()
d.parseDemo <- demo
continue continue
} }
matchId, _, _, err := DecodeSharecode(demo.ShareCode) matchId, _, _, err := DecodeSharecode(demo.ShareCode)
if err != nil || matchId == 0 { if err != nil || matchId == 0 {
log.Warningf("[DL] Can't parse match with sharecode %s: %v", demo.ShareCode, err) log.Warningf("[DL] Can't parse match with sharecode %s: %v", demo.ShareCode, err)
d.ParseMapL.Lock() dml.unlockDemo(demo)
delete(d.ParseMap, demo.ShareCode)
d.ParseMapL.Unlock()
continue continue
} }
iMatch, err := d.db.Match.Get(context.Background(), matchId) iMatch, err := dml.db.Match.Get(context.Background(), matchId)
if err != nil { if err != nil {
switch e := err.(type) { switch e := err.(type) {
case *ent.NotFoundError: case *ent.NotFoundError:
break break
default: default:
log.Errorf("[DL] Failure trying to lookup match %d in db: %v", matchId, e) log.Errorf("[DL] Failure trying to lookup match %d in db: %v", matchId, e)
d.ParseMapL.Lock() dml.unlockDemo(demo)
delete(d.ParseMap, demo.ShareCode)
d.ParseMapL.Unlock()
continue continue
} }
} else { } else {
@@ -353,32 +371,26 @@ func (d *DemoMatchLoader) gcWorker(apiKey string, rl ratelimit.Limiter) {
demo.MatchId = matchId demo.MatchId = matchId
demo.Url = iMatch.ReplayURL demo.Url = iMatch.ReplayURL
demo.DecryptionKey = iMatch.DecryptionKey demo.DecryptionKey = iMatch.DecryptionKey
err := d.dp.ParseDemo(demo) err := dml.dp.ParseDemo(demo)
if err != nil { if err != nil {
log.Warningf("[DL] Parsing demo from match %d failed: %v", demo.MatchId, err) log.Warningf("[DL] Parsing demo from match %d failed: %v", demo.MatchId, err)
dml.unlockDemo(demo)
} }
d.ParseMapL.Lock()
delete(d.ParseMap, demo.ShareCode)
d.ParseMapL.Unlock()
continue continue
} }
log.Infof("[DL] Skipped match %d: already loaded", matchId) log.Infof("[DL] Skipped match %d: already loaded", matchId)
d.ParseMapL.Lock() dml.unlockDemo(demo)
delete(d.ParseMap, demo.ShareCode)
d.ParseMapL.Unlock()
continue continue
} }
log.Infof("[DL] Requesting match %d from GC", matchId) log.Infof("[DL] Requesting match %d from GC", matchId)
t := time.Now() t := time.Now()
matchDetails, err := d.getMatchDetails(demo.ShareCode) matchDetails, err := dml.getMatchDetails(demo.ShareCode)
if err != nil { if err != nil {
log.Warningf("[DL] Failure to get match-details for %d from GC: %v", demo.MatchId, err) log.Warningf("[DL] Failure to get match-details for %d from GC: %v", demo.MatchId, err)
d.ParseMapL.Lock() dml.unlockDemo(demo)
delete(d.ParseMap, demo.ShareCode)
d.ParseMapL.Unlock()
continue continue
} }
@@ -389,7 +401,7 @@ func (d *DemoMatchLoader) gcWorker(apiKey string, rl ratelimit.Limiter) {
var players []*ent.Player var players []*ent.Player
for _, accountId := range lastRound.GetReservation().GetAccountIds() { for _, accountId := range lastRound.GetReservation().GetAccountIds() {
tPlayer, err := utils.Player(d.db, AccountId2SteamId(accountId), apiKey, rl) tPlayer, err := utils.Player(dml.db, AccountId2SteamId(accountId), apiKey, rl)
if err != nil { if err != nil {
log.Warningf("[DL] Unable to get player for steamid %d: %v", AccountId2SteamId(accountId), err) log.Warningf("[DL] Unable to get player for steamid %d: %v", AccountId2SteamId(accountId), err)
continue continue
@@ -401,7 +413,7 @@ func (d *DemoMatchLoader) gcWorker(apiKey string, rl ratelimit.Limiter) {
demo.MatchId = matchZero.GetMatchid() demo.MatchId = matchZero.GetMatchid()
demo.DecryptionKey = []byte(strings.ToUpper(strconv.FormatUint(matchZero.GetWatchablematchinfo().GetClDecryptdataKeyPub(), 16))) demo.DecryptionKey = []byte(strings.ToUpper(strconv.FormatUint(matchZero.GetWatchablematchinfo().GetClDecryptdataKeyPub(), 16)))
tMatch, err := d.db.Match.Create(). tMatch, err := dml.db.Match.Create().
SetID(matchZero.GetMatchid()). SetID(matchZero.GetMatchid()).
AddPlayers(players...). AddPlayers(players...).
SetDate(time.Unix(int64(matchZero.GetMatchtime()), 0).UTC()). SetDate(time.Unix(int64(matchZero.GetMatchtime()), 0).UTC()).
@@ -416,9 +428,7 @@ func (d *DemoMatchLoader) gcWorker(apiKey string, rl ratelimit.Limiter) {
Save(context.Background()) Save(context.Background())
if err != nil { if err != nil {
log.Warningf("[DL] Unable to create match %d: %v", matchZero.GetMatchid(), err) log.Warningf("[DL] Unable to create match %d: %v", matchZero.GetMatchid(), err)
d.ParseMapL.Lock() dml.unlockDemo(demo)
delete(d.ParseMap, demo.ShareCode)
d.ParseMapL.Unlock()
continue continue
} }
@@ -457,7 +467,7 @@ func (d *DemoMatchLoader) gcWorker(apiKey string, rl ratelimit.Limiter) {
} }
kills, deaths, assists, hs, score, mvp := playerStatsFromRound(lastRound, mPlayer) kills, deaths, assists, hs, score, mvp := playerStatsFromRound(lastRound, mPlayer)
err := d.db.MatchPlayer.Create(). err := dml.db.MatchPlayer.Create().
SetMatches(tMatch). SetMatches(tMatch).
SetPlayers(mPlayer). SetPlayers(mPlayer).
SetTeamID(teamId). SetTeamID(teamId).
@@ -479,7 +489,7 @@ func (d *DemoMatchLoader) gcWorker(apiKey string, rl ratelimit.Limiter) {
// clear cache or regen values player // clear cache or regen values player
for _, p := range players { for _, p := range players {
err = d.cache.Delete(context.Background(), fmt.Sprintf(utils.SideMetaCacheKey, p.ID)) err = dml.cache.Delete(context.Background(), fmt.Sprintf(utils.SideMetaCacheKey, p.ID))
if err != nil { if err != nil {
log.Warningf("[DL] Unable to delete cache key %s: %v", fmt.Sprintf(utils.SideMetaCacheKey, p.ID), err) log.Warningf("[DL] Unable to delete cache key %s: %v", fmt.Sprintf(utils.SideMetaCacheKey, p.ID), err)
} }
@@ -495,12 +505,10 @@ func (d *DemoMatchLoader) gcWorker(apiKey string, rl ratelimit.Limiter) {
} }
} }
err = d.dp.ParseDemo(demo) err = dml.dp.ParseDemo(demo)
if err != nil { if err != nil {
log.Warningf("[DL] Can't queue demo %d for parsing: %v", demo.MatchId, err) log.Warningf("[DL] Can't queue demo %d for parsing: %v", demo.MatchId, err)
dml.unlockDemo(demo)
} }
d.ParseMapL.Lock()
delete(d.ParseMap, demo.ShareCode)
d.ParseMapL.Unlock()
} }
} }

View File

@@ -32,6 +32,7 @@ type DemoParser struct {
tempDir string tempDir string
db *ent.Client db *ent.Client
sprayTimeout int sprayTimeout int
Done chan *Demo
} }
type Encounter struct { type Encounter struct {
@@ -105,19 +106,20 @@ func (s *Sprays) Avg() (avg [][]float32) {
return return
} }
func (p *DemoParser) Setup(db *ent.Client, worker int, sprayTimeout int) error { func (dp *DemoParser) Setup(db *ent.Client, worker int, sprayTimeout int) error {
p.demoQueue = make(chan *Demo, 1000) dp.demoQueue = make(chan *Demo, 1000)
p.db = db dp.db = db
p.sprayTimeout = sprayTimeout dp.sprayTimeout = sprayTimeout
dp.Done = make(chan *Demo, worker)
for i := 0; i < worker; i++ { for i := 0; i < worker; i++ {
go p.parseWorker() go dp.parseWorker()
} }
return nil return nil
} }
func (p *DemoParser) ParseDemo(demo *Demo) error { func (dp *DemoParser) ParseDemo(demo *Demo) error {
select { select {
case p.demoQueue <- demo: case dp.demoQueue <- demo:
return nil return nil
default: default:
return fmt.Errorf("queue full") return fmt.Errorf("queue full")
@@ -138,8 +140,8 @@ func (d *Demo) download() (io.Reader, error) {
return bzip2.NewReader(r.Body), nil return bzip2.NewReader(r.Body), nil
} }
func (p *DemoParser) getDBPlayer(demo *Demo, demoPlayer *common.Player) (*ent.MatchPlayer, error) { func (dp *DemoParser) getDBPlayer(demo *Demo, demoPlayer *common.Player) (*ent.MatchPlayer, error) {
tMatchPlayer, err := p.db.MatchPlayer.Query().WithMatches(func(q *ent.MatchQuery) { tMatchPlayer, err := dp.db.MatchPlayer.Query().WithMatches(func(q *ent.MatchQuery) {
q.Where(match.ID(demo.MatchId)) q.Where(match.ID(demo.MatchId))
}).WithPlayers(func(q *ent.PlayerQuery) { }).WithPlayers(func(q *ent.PlayerQuery) {
q.Where(player.ID(demoPlayer.SteamID64)) q.Where(player.ID(demoPlayer.SteamID64))
@@ -151,7 +153,7 @@ func (p *DemoParser) getDBPlayer(demo *Demo, demoPlayer *common.Player) (*ent.Ma
return tMatchPlayer, nil return tMatchPlayer, nil
} }
func (p *DemoParser) MatchPlayerBySteamID(stats []*ent.MatchPlayer, steamId uint64) (*ent.MatchPlayer, error) { func (dp *DemoParser) MatchPlayerBySteamID(stats []*ent.MatchPlayer, steamId uint64) (*ent.MatchPlayer, error) {
for _, tStats := range stats { for _, tStats := range stats {
tPLayer, err := tStats.Edges.PlayersOrErr() tPLayer, err := tStats.Edges.PlayersOrErr()
if err != nil { if err != nil {
@@ -190,21 +192,24 @@ func setMatchPlayerColor(matchPlayer *ent.MatchPlayer, demoPlayer *common.Player
} }
} }
func (p *DemoParser) parseWorker() { func (dp *DemoParser) parseWorker() {
for demo := range p.demoQueue { for demo := range dp.demoQueue {
if demo.MatchId == 0 { if demo.MatchId == 0 {
log.Warningf("[DP] can't parse match %s: no matchid found", demo.ShareCode) log.Warningf("[DP] can't parse match %s: no matchid found", demo.ShareCode)
dp.Done <- demo
continue continue
} }
tMatch, err := p.db.Match.Get(context.Background(), demo.MatchId) tMatch, err := dp.db.Match.Get(context.Background(), demo.MatchId)
if err != nil { if err != nil {
log.Errorf("[DP] Unable to get match %d: %v", demo.MatchId, err) log.Errorf("[DP] Unable to get match %d: %v", demo.MatchId, err)
dp.Done <- demo
continue continue
} }
if tMatch.DemoParsed { if tMatch.DemoParsed {
log.Infof("[DP] skipped already parsed %d", demo.MatchId) log.Infof("[DP] skipped already parsed %d", demo.MatchId)
dp.Done <- demo
continue continue
} }
@@ -217,9 +222,11 @@ func (p *DemoParser) parseWorker() {
} else { } else {
log.Infof("[DP] demo 404 not found for match %d. Trying again later.", demo.MatchId) log.Infof("[DP] demo 404 not found for match %d. Trying again later.", demo.MatchId)
} }
dp.Done <- demo
continue continue
} else { } else {
log.Errorf("[DP] Unable to download demo for %d: %v", demo.MatchId, err) log.Errorf("[DP] Unable to download demo for %d: %v", demo.MatchId, err)
dp.Done <- demo
continue continue
} }
} }
@@ -228,6 +235,7 @@ func (p *DemoParser) parseWorker() {
tStats, err := tMatch.QueryStats().WithPlayers().All(context.Background()) tStats, err := tMatch.QueryStats().WithPlayers().All(context.Background())
if err != nil { if err != nil {
log.Errorf("[DP] Failed to find players for match %d: %v", demo.MatchId, err) log.Errorf("[DP] Failed to find players for match %d: %v", demo.MatchId, err)
dp.Done <- demo
continue continue
} }
@@ -255,7 +263,7 @@ func (p *DemoParser) parseWorker() {
// onChatMessage // onChatMessage
demoParser.RegisterEventHandler(func(e events.ChatMessage) { demoParser.RegisterEventHandler(func(e events.ChatMessage) {
gs := demoParser.GameState() gs := demoParser.GameState()
tAttacker, err := p.MatchPlayerBySteamID(tStats, e.Sender.SteamID64) tAttacker, err := dp.MatchPlayerBySteamID(tStats, e.Sender.SteamID64)
if err != nil { if err != nil {
log.Warningf("[DP] Unable to get player for id %d: %v", e.Sender.SteamID64, err) log.Warningf("[DP] Unable to get player for id %d: %v", e.Sender.SteamID64, err)
return return
@@ -292,7 +300,7 @@ func (p *DemoParser) parseWorker() {
for _, spray := range spays { for _, spray := range spays {
if e.Shooter.SteamID64 == spray.Sprayer && int(e.Weapon.Type) == spray.Weapon { if e.Shooter.SteamID64 == spray.Sprayer && int(e.Weapon.Type) == spray.Weapon {
playerWeaponFound = true playerWeaponFound = true
spray.Add(demoParser.CurrentTime(), []float32{e.Shooter.ViewDirectionX(), e.Shooter.ViewDirectionY()}, p.sprayTimeout) spray.Add(demoParser.CurrentTime(), []float32{e.Shooter.ViewDirectionX(), e.Shooter.ViewDirectionY()}, dp.sprayTimeout)
} }
} }
@@ -311,7 +319,7 @@ func (p *DemoParser) parseWorker() {
return return
} }
tAttacker, err := p.MatchPlayerBySteamID(tStats, e.Attacker.SteamID64) tAttacker, err := dp.MatchPlayerBySteamID(tStats, e.Attacker.SteamID64)
if err != nil { if err != nil {
log.Warningf("[DP] Unable to get player for id %d: %v", e.Attacker.SteamID64, err) log.Warningf("[DP] Unable to get player for id %d: %v", e.Attacker.SteamID64, err)
return return
@@ -373,7 +381,7 @@ func (p *DemoParser) parseWorker() {
return return
} }
tAttacker, err := p.MatchPlayerBySteamID(tStats, e.Attacker.SteamID64) tAttacker, err := dp.MatchPlayerBySteamID(tStats, e.Attacker.SteamID64)
if err != nil { if err != nil {
log.Warningf("[DP] Unable to get player for id %d: %v", e.Attacker.SteamID64, err) log.Warningf("[DP] Unable to get player for id %d: %v", e.Attacker.SteamID64, err)
return return
@@ -401,7 +409,7 @@ func (p *DemoParser) parseWorker() {
// onPlayerConnected // onPlayerConnected
demoParser.RegisterEventHandler(func(e events.PlayerTeamChange) { demoParser.RegisterEventHandler(func(e events.PlayerTeamChange) {
if e.Player != nil && e.Player.SteamID64 != 0 { if e.Player != nil && e.Player.SteamID64 != 0 {
tMatchPlayer, err := p.MatchPlayerBySteamID(tStats, e.Player.SteamID64) tMatchPlayer, err := dp.MatchPlayerBySteamID(tStats, e.Player.SteamID64)
if err != nil { if err != nil {
log.Warningf("[DP] Unable to get player for id %d: %v", e.Player.SteamID64, err) log.Warningf("[DP] Unable to get player for id %d: %v", e.Player.SteamID64, err)
return return
@@ -416,7 +424,7 @@ func (p *DemoParser) parseWorker() {
for _, demoPlayer := range gs.Participants().Playing() { for _, demoPlayer := range gs.Participants().Playing() {
if demoPlayer != nil && demoPlayer.SteamID64 != 0 { if demoPlayer != nil && demoPlayer.SteamID64 != 0 {
tMatchPlayer, err := p.MatchPlayerBySteamID(tStats, demoPlayer.SteamID64) tMatchPlayer, err := dp.MatchPlayerBySteamID(tStats, demoPlayer.SteamID64)
if err != nil { if err != nil {
log.Warningf("[DP] Unable to get player for id %d: %v", demoPlayer.SteamID64, err) log.Warningf("[DP] Unable to get player for id %d: %v", demoPlayer.SteamID64, err)
return return
@@ -429,7 +437,7 @@ func (p *DemoParser) parseWorker() {
// onRankUpdate // onRankUpdate
demoParser.RegisterEventHandler(func(e events.RankUpdate) { demoParser.RegisterEventHandler(func(e events.RankUpdate) {
if e.SteamID64() != 0 { if e.SteamID64() != 0 {
tMatchPlayer, err := p.MatchPlayerBySteamID(tStats, e.SteamID64()) tMatchPlayer, err := dp.MatchPlayerBySteamID(tStats, e.SteamID64())
if err != nil { if err != nil {
log.Warningf("[DP] Unable to get player for id %d: %v", e.SteamID64(), err) log.Warningf("[DP] Unable to get player for id %d: %v", e.SteamID64(), err)
return return
@@ -443,6 +451,7 @@ func (p *DemoParser) parseWorker() {
err = demoParser.ParseToEnd() err = demoParser.ParseToEnd()
if err != nil { if err != nil {
log.Errorf("[DP] Error parsing replay: %v", err) log.Errorf("[DP] Error parsing replay: %v", err)
dp.Done <- demo
continue continue
} }
@@ -453,6 +462,7 @@ func (p *DemoParser) parseWorker() {
Exec(context.Background()) Exec(context.Background())
if err != nil { if err != nil {
log.Errorf("[DP] Unable to update match %d in database: %v", demo.MatchId, err) log.Errorf("[DP] Unable to update match %d in database: %v", demo.MatchId, err)
dp.Done <- demo
continue continue
} }
@@ -487,14 +497,14 @@ func (p *DemoParser) parseWorker() {
} }
for _, eqDmg := range eqMap[tMatchPlayer.PlayerStats] { for _, eqDmg := range eqMap[tMatchPlayer.PlayerStats] {
err := p.db.Weapon.Create().SetStat(nMatchPLayer).SetDmg(eqDmg.Dmg).SetVictim(eqDmg.To).SetHitGroup(eqDmg.HitGroup).SetEqType(eqDmg.Eq).Exec(context.Background()) err := dp.db.Weapon.Create().SetStat(nMatchPLayer).SetDmg(eqDmg.Dmg).SetVictim(eqDmg.To).SetHitGroup(eqDmg.HitGroup).SetEqType(eqDmg.Eq).Exec(context.Background())
if err != nil { if err != nil {
log.Errorf("[DP] Unable to create WeaponStat: %v", err) log.Errorf("[DP] Unable to create WeaponStat: %v", err)
} }
} }
for _, eco := range ecoMap[tMatchPlayer.PlayerStats] { for _, eco := range ecoMap[tMatchPlayer.PlayerStats] {
err := p.db.RoundStats.Create().SetMatchPlayer(nMatchPLayer).SetRound(uint(eco.Round)).SetBank(uint(eco.Bank)).SetEquipment(uint(eco.EqV)).SetSpent(uint(eco.Spent)).Exec(context.Background()) err := dp.db.RoundStats.Create().SetMatchPlayer(nMatchPLayer).SetRound(uint(eco.Round)).SetBank(uint(eco.Bank)).SetEquipment(uint(eco.EqV)).SetSpent(uint(eco.Spent)).Exec(context.Background())
if err != nil { if err != nil {
log.Errorf("[DP] Unable to create RoundStat: %v", err) log.Errorf("[DP] Unable to create RoundStat: %v", err)
} }
@@ -511,7 +521,7 @@ func (p *DemoParser) parseWorker() {
continue continue
} }
err = p.db.Spray.Create().SetMatchPlayers(nMatchPLayer).SetWeapon(spray.Weapon).SetSpray(sprayBuf.Bytes()).Exec(context.Background()) err = dp.db.Spray.Create().SetMatchPlayers(nMatchPLayer).SetWeapon(spray.Weapon).SetSpray(sprayBuf.Bytes()).Exec(context.Background())
if err != nil { if err != nil {
log.Warningf("[DP] Failure adding spray to database: %v", err) log.Warningf("[DP] Failure adding spray to database: %v", err)
} }
@@ -521,10 +531,10 @@ func (p *DemoParser) parseWorker() {
bulk := make([]*ent.MessagesCreate, 0) bulk := make([]*ent.MessagesCreate, 0)
for _, msg := range tMatchPlayer.Edges.Messages { for _, msg := range tMatchPlayer.Edges.Messages {
bulk = append(bulk, p.db.Messages.Create().SetMessage(msg.Message).SetAllChat(msg.AllChat).SetTick(msg.Tick).SetMatchPlayer(tMatchPlayer)) bulk = append(bulk, dp.db.Messages.Create().SetMessage(msg.Message).SetAllChat(msg.AllChat).SetTick(msg.Tick).SetMatchPlayer(tMatchPlayer))
} }
if len(bulk) > 0 { if len(bulk) > 0 {
err = p.db.Messages.CreateBulk(bulk...).Exec(context.Background()) err = dp.db.Messages.CreateBulk(bulk...).Exec(context.Background())
if err != nil { if err != nil {
log.Warningf("[DP] Failure adding messages to database: %v", err) log.Warningf("[DP] Failure adding messages to database: %v", err)
} }
@@ -537,5 +547,6 @@ func (p *DemoParser) parseWorker() {
if err != nil { if err != nil {
log.Errorf("[DP] Unable close demo file for match %d: %v", demo.MatchId, err) log.Errorf("[DP] Unable close demo file for match %d: %v", demo.MatchId, err)
} }
dp.Done <- demo
} }
} }

View File

@@ -174,18 +174,16 @@ func housekeeping() {
} }
for _, m := range tMatches { for _, m := range tMatches {
demoLoader.ParseMapL.Lock() demo := &csgo.Demo{MatchId: m.ID, ShareCode: m.ShareCode}
if _, ok := demoLoader.ParseMap[m.ShareCode]; ok { if demoLoader.IsLoading(demo) {
log.Infof("[HK] Skipping %s: parsing in progress", m.ShareCode) log.Infof("[HK] Skipping %s: parsing in progress", m.ShareCode)
demoLoader.ParseMapL.Unlock()
continue continue
} }
log.Infof("[HK] Try reparsing match %d, played on %s", m.ID, m.Date) log.Infof("[HK] Try reparsing match %d, played on %s", m.ID, m.Date)
err := demoLoader.LoadDemo(&csgo.Demo{MatchId: m.ID, ShareCode: m.ShareCode}) err := demoLoader.LoadDemo(demo)
if err != nil { if err != nil {
log.Warningf("[HK] Failure trying to parse match %d: %v", m.ID, err) log.Warningf("[HK] Failure trying to parse match %d: %v", m.ID, err)
} }
demoLoader.ParseMapL.Unlock()
} }
// check for inconsistent matches // check for inconsistent matches