refactored some functions, added demo download retry

This commit is contained in:
2021-10-09 19:10:38 +02:00
parent 15b273f052
commit e938b05f52
13 changed files with 242 additions and 322 deletions

View File

@@ -1,6 +1,10 @@
package csgo package csgo
import ( import (
"context"
"csgowtfd/ent"
"csgowtfd/ent/match"
"csgowtfd/utils"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/an0nfunc/go-steam/v3" "github.com/an0nfunc/go-steam/v3"
@@ -9,10 +13,12 @@ import (
"github.com/an0nfunc/go-steam/v3/protocol/gamecoordinator" "github.com/an0nfunc/go-steam/v3/protocol/gamecoordinator"
"github.com/an0nfunc/go-steam/v3/protocol/steamlang" "github.com/an0nfunc/go-steam/v3/protocol/steamlang"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"go.uber.org/ratelimit"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"io/ioutil" "io/ioutil"
"math/rand" "math/rand"
"os" "os"
"sync"
"time" "time"
) )
@@ -21,6 +27,20 @@ const (
APPID = 730 APPID = 730
) )
type DemoMatchLoaderConfig struct {
Username string
Password string
AuthCode string
Sentry string
LoginKey string
ServerList string
Db *ent.Client
Lock *sync.RWMutex
Worker int
ApiKey string
RateLimit ratelimit.Limiter
}
type DemoMatchLoader struct { type DemoMatchLoader struct {
client *steam.Client client *steam.Client
GCReady bool GCReady bool
@@ -30,6 +50,10 @@ type DemoMatchLoader struct {
sentryFile string sentryFile string
loginKey string loginKey string
serverList string serverList string
db *ent.Client
lock *sync.RWMutex
dp *DemoParser
parseDemo chan *Demo
} }
func AccountId2SteamId(accId uint32) uint64 { func AccountId2SteamId(accId uint32) uint64 {
@@ -82,7 +106,7 @@ func (d *DemoMatchLoader) HandleGCPacket(pkg *gamecoordinator.GCPacket) {
} }
} }
func (d *DemoMatchLoader) GetMatchDetails(sharecode string) (*protobuf.CMsgGCCStrike15V2_MatchList, error) { func (d *DemoMatchLoader) getMatchDetails(sharecode string) (*protobuf.CMsgGCCStrike15V2_MatchList, error) {
if !d.GCReady { if !d.GCReady {
return nil, fmt.Errorf("gc not ready") return nil, fmt.Errorf("gc not ready")
} }
@@ -91,18 +115,18 @@ func (d *DemoMatchLoader) GetMatchDetails(sharecode string) (*protobuf.CMsgGCCSt
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = d.RequestDemoInfo(matchId, outcomeId, tokenId) err = d.requestDemoInfo(matchId, outcomeId, tokenId)
if err != nil { if err != nil {
return nil, err return nil, err
} }
for { for {
select { select {
case match := <-d.matchRecv: case matchDetails := <-d.matchRecv:
if *match.Matches[0].Matchid == matchId { if *matchDetails.Matches[0].Matchid == matchId {
return match, nil return matchDetails, nil
} else { } else {
d.matchRecv <- match d.matchRecv <- matchDetails
} }
} }
} }
@@ -127,15 +151,15 @@ func (d *DemoMatchLoader) connectToSteam() error {
return nil return nil
} }
func (d *DemoMatchLoader) Setup(username string, password string, authCode string, sentry string, loginKey string, serverList string) error { func (d *DemoMatchLoader) Setup(config *DemoMatchLoaderConfig) error {
d.loginKey = loginKey d.loginKey = config.LoginKey
d.sentryFile = sentry d.sentryFile = config.Sentry
d.serverList = serverList d.serverList = config.ServerList
d.steamLogin = new(steam.LogOnDetails) d.steamLogin = new(steam.LogOnDetails)
d.steamLogin.Username = username d.steamLogin.Username = config.Username
d.steamLogin.Password = password d.steamLogin.Password = config.Password
d.steamLogin.AuthCode = authCode d.steamLogin.AuthCode = config.AuthCode
d.steamLogin.ShouldRememberPassword = true d.steamLogin.ShouldRememberPassword = true
if _, err := os.Stat(d.sentryFile); err == nil { if _, err := os.Stat(d.sentryFile); err == nil {
@@ -166,14 +190,28 @@ func (d *DemoMatchLoader) Setup(username string, password string, authCode strin
} }
d.client = steam.NewClient() d.client = steam.NewClient()
d.matchRecv = make(chan *protobuf.CMsgGCCStrike15V2_MatchList, 500) d.matchRecv = make(chan *protobuf.CMsgGCCStrike15V2_MatchList, 1000)
d.parseDemo = make(chan *Demo, 1000)
go d.connectLoop() go d.connectLoop()
go d.steamEventHandler() go d.steamEventHandler()
for i := 0; i < config.Worker; i++ {
go d.gcWorker(config.ApiKey, config.RateLimit)
}
return nil return nil
} }
func (d DemoMatchLoader) LoadDemo(demo *Demo) error {
select {
case d.parseDemo <- demo:
return nil
default:
return fmt.Errorf("queue full")
}
}
func (d DemoMatchLoader) connectLoop() { func (d DemoMatchLoader) connectLoop() {
for d.connectToSteam() != nil { for d.connectToSteam() != nil {
log.Infof("Retrying connecting to steam") log.Infof("Retrying connecting to steam")
@@ -207,15 +245,15 @@ func (d *DemoMatchLoader) steamEventHandler() {
case *steam.LoggedOnEvent: case *steam.LoggedOnEvent:
log.Debug("[DL] Login successfully!") log.Debug("[DL] Login successfully!")
d.client.Social.SetPersonaState(steamlang.EPersonaState_Online) d.client.Social.SetPersonaState(steamlang.EPersonaState_Online)
go d.SetPlaying() go d.setPlaying()
case *steam.LogOnFailedEvent: case *steam.LogOnFailedEvent:
log.Warningf("[DL] Steam login denied: %+v", e) log.Warningf("[DL] Steam login denied: %+v", e)
switch e.Result { switch e.Result {
case steamlang.EResult_AccountLogonDenied: case steamlang.EResult_AccountLogonDenied:
log.Fatalf("[DL] Please provide AuthCode with --authcode") log.Fatalf("[DL] Please provide AuthCode with --authcode")
case steamlang.EResult_InvalidPassword: case steamlang.EResult_InvalidPassword:
os.Remove(d.sentryFile) _ = os.Remove(d.sentryFile)
os.Remove(d.loginKey) _ = os.Remove(d.loginKey)
log.Fatalf("[DL] Steam login wrong") log.Fatalf("[DL] Steam login wrong")
case steamlang.EResult_InvalidLoginAuthCode: case steamlang.EResult_InvalidLoginAuthCode:
log.Fatalf("[DL] Steam auth code wrong") log.Fatalf("[DL] Steam auth code wrong")
@@ -242,7 +280,7 @@ func (d *DemoMatchLoader) steamEventHandler() {
} }
} }
func (d *DemoMatchLoader) SetPlaying() { func (d *DemoMatchLoader) setPlaying() {
d.client.GC.SetGamesPlayed(APPID) d.client.GC.SetGamesPlayed(APPID)
d.client.GC.RegisterPacketHandler(d) d.client.GC.RegisterPacketHandler(d)
go d.greetGC() go d.greetGC()
@@ -257,7 +295,7 @@ func (d *DemoMatchLoader) greetGC() {
} }
} }
func (d *DemoMatchLoader) RequestDemoInfo(matchId uint64, conclusionId uint64, tokenId uint32) error { func (d *DemoMatchLoader) requestDemoInfo(matchId uint64, conclusionId uint64, tokenId uint32) error {
if !d.GCReady { if !d.GCReady {
return fmt.Errorf("gc not ready") return fmt.Errorf("gc not ready")
} }
@@ -270,3 +308,121 @@ func (d *DemoMatchLoader) RequestDemoInfo(matchId uint64, conclusionId uint64, t
return nil return nil
} }
func (d *DemoMatchLoader) gcWorker(apiKey string, rl ratelimit.Limiter) {
for {
select {
case demo := <-d.parseDemo:
if !d.GCReady {
time.Sleep(5 * time.Second)
d.parseDemo <- demo
continue
}
matchId, _, _, err := DecodeSharecode(demo.ShareCode)
if err != nil || matchId == 0 {
log.Warningf("[DL] Can't parse match with sharecode %s: %v", demo.ShareCode, err)
continue
}
d.lock.RLock()
iMatch, err := d.db.Match.Query().Where(match.ID(matchId)).Only(context.Background())
d.lock.RUnlock()
if err != nil {
switch e := err.(type) {
case *ent.NotFoundError:
break
default:
log.Errorf("[DL] Failure trying to find match %d in db: %v", matchId, e)
}
} else {
if iMatch.DemoParsed == false && iMatch.Date.Before(time.Now().UTC().AddDate(0, 0, -30)) {
log.Infof("[DL] Match %d is loaded, but not parsed. Try parsing.", demo.MatchId)
demo.MatchId = matchId
demo.Url = iMatch.ReplayURL
err := d.dp.ParseDemo(demo)
if err != nil {
log.Warningf("[DL] Parsing demo from match %d failed: %v", demo.MatchId, err)
}
continue
}
log.Debugf("[DL] Skipped match %d: already parsed", matchId)
continue
}
matchDetails, err := d.getMatchDetails(demo.ShareCode)
if err != nil {
log.Warningf("[DL] Failure to get match-details for %d from GC: %v", demo.MatchId, err)
continue
}
matchZero := matchDetails.GetMatches()[0]
lastRound := matchZero.GetRoundstatsall()[len(matchZero.Roundstatsall)-1]
var players []*ent.Player
for _, accountId := range lastRound.GetReservation().GetAccountIds() {
tPlayer, err := utils.GetPlayer(&utils.DBWithLock{
Client: d.db,
Lock: d.lock,
}, AccountId2SteamId(accountId), apiKey, rl)
if err != nil {
log.Warningf("[DL] Unable to get player for steamid %d: %v", AccountId2SteamId(accountId), err)
continue
}
players = append(players, tPlayer)
}
demo.Url = lastRound.GetMap()
demo.MatchId = matchZero.GetMatchid()
d.lock.Lock()
tMatch, err := d.db.Match.Create().
SetID(matchZero.GetMatchid()).
AddPlayers(players...).
SetDate(time.Unix(int64(matchZero.GetMatchtime()), 0).UTC()).
SetMaxRounds(int(lastRound.GetMaxRounds())).
SetDuration(int(lastRound.GetMatchDuration())).
SetShareCode(demo.ShareCode).
SetReplayURL(lastRound.GetMap()).
SetScoreTeamA(int(lastRound.GetTeamScores()[0])).
SetScoreTeamB(int(lastRound.GetTeamScores()[1])).
SetMatchResult(int(lastRound.GetMatchResult())).
Save(context.Background())
d.lock.Unlock()
if err != nil {
log.Warningf("[DL] Unable to create match %d: %v", matchZero.GetMatchid(), err)
continue
}
for i, mPlayer := range players {
var teamId int
if i > 4 {
teamId = 2
} else {
teamId = 1
}
d.lock.Lock()
err := d.db.Stats.Create().
SetMatches(tMatch).
SetPlayers(mPlayer).
SetTeamID(teamId).
SetKills(int(lastRound.GetKills()[i])).
SetDeaths(int(lastRound.GetDeaths()[i])).
SetAssists(int(lastRound.GetAssists()[i])).
SetMvp(int(lastRound.GetMvps()[i])).
SetScore(int(lastRound.GetScores()[i])).
SetHeadshot(int(lastRound.GetEnemyHeadshots()[i])).
Exec(context.Background())
d.lock.Unlock()
if err != nil {
log.Warningf("[DL] Unable to create stats for player %d in match %d: %v", mPlayer.ID, tMatch.ID, err)
}
}
err = d.dp.ParseDemo(demo)
if err != nil {
log.Warningf("[DL] Can't queue demo %d for parsing: %v", demo.MatchId, err)
}
}
}
}

View File

@@ -21,9 +21,6 @@ type Demo struct {
ShareCode string ShareCode string
MatchId uint64 MatchId uint64
Url string Url string
Rank int
Tickrate int
File string
} }
type DemoParser struct { type DemoParser struct {
@@ -128,14 +125,7 @@ func (p *DemoParser) parseWorker() {
if err != nil { if err != nil {
switch e := err.(type) { switch e := err.(type) {
case DemoNotFoundError: case DemoNotFoundError:
p.lock.Lock() log.Warningf("[DP] Demo not found for %d. Maybe temp.?", demo.MatchId)
err := tMatch.Update().SetDemoExpired(true).Exec(context.Background())
p.lock.Unlock()
if err != nil {
log.Errorf("[DP] Unable to set demo expire for match %d: %v", demo.MatchId, e)
continue
}
log.Warningf("[DP] Demo already expired for %d", demo.MatchId)
continue continue
default: default:
log.Warningf("[DP] Unable to download demo for %d: %v", demo.MatchId, e) log.Warningf("[DP] Unable to download demo for %d: %v", demo.MatchId, e)

View File

@@ -35,8 +35,6 @@ type Match struct {
MatchResult int `json:"match_result,omitempty"` MatchResult int `json:"match_result,omitempty"`
// MaxRounds holds the value of the "max_rounds" field. // MaxRounds holds the value of the "max_rounds" field.
MaxRounds int `json:"max_rounds,omitempty"` MaxRounds int `json:"max_rounds,omitempty"`
// DemoExpired holds the value of the "demo_expired" field.
DemoExpired bool `json:"demo_expired,omitempty"`
// DemoParsed holds the value of the "demo_parsed" field. // DemoParsed holds the value of the "demo_parsed" field.
DemoParsed bool `json:"demo_parsed,omitempty"` DemoParsed bool `json:"demo_parsed,omitempty"`
// Eco holds the value of the "eco" field. // Eco holds the value of the "eco" field.
@@ -88,7 +86,7 @@ func (*Match) scanValues(columns []string) ([]interface{}, error) {
switch columns[i] { switch columns[i] {
case match.FieldEco: case match.FieldEco:
values[i] = new([]byte) values[i] = new([]byte)
case match.FieldDemoExpired, match.FieldDemoParsed: case match.FieldDemoParsed:
values[i] = new(sql.NullBool) values[i] = new(sql.NullBool)
case match.FieldID, match.FieldScoreTeamA, match.FieldScoreTeamB, match.FieldDuration, match.FieldMatchResult, match.FieldMaxRounds: case match.FieldID, match.FieldScoreTeamA, match.FieldScoreTeamB, match.FieldDuration, match.FieldMatchResult, match.FieldMaxRounds:
values[i] = new(sql.NullInt64) values[i] = new(sql.NullInt64)
@@ -171,12 +169,6 @@ func (m *Match) assignValues(columns []string, values []interface{}) error {
} else if value.Valid { } else if value.Valid {
m.MaxRounds = int(value.Int64) m.MaxRounds = int(value.Int64)
} }
case match.FieldDemoExpired:
if value, ok := values[i].(*sql.NullBool); !ok {
return fmt.Errorf("unexpected type %T for field demo_expired", values[i])
} else if value.Valid {
m.DemoExpired = value.Bool
}
case match.FieldDemoParsed: case match.FieldDemoParsed:
if value, ok := values[i].(*sql.NullBool); !ok { if value, ok := values[i].(*sql.NullBool); !ok {
return fmt.Errorf("unexpected type %T for field demo_parsed", values[i]) return fmt.Errorf("unexpected type %T for field demo_parsed", values[i])
@@ -247,8 +239,6 @@ func (m *Match) String() string {
builder.WriteString(fmt.Sprintf("%v", m.MatchResult)) builder.WriteString(fmt.Sprintf("%v", m.MatchResult))
builder.WriteString(", max_rounds=") builder.WriteString(", max_rounds=")
builder.WriteString(fmt.Sprintf("%v", m.MaxRounds)) builder.WriteString(fmt.Sprintf("%v", m.MaxRounds))
builder.WriteString(", demo_expired=")
builder.WriteString(fmt.Sprintf("%v", m.DemoExpired))
builder.WriteString(", demo_parsed=") builder.WriteString(", demo_parsed=")
builder.WriteString(fmt.Sprintf("%v", m.DemoParsed)) builder.WriteString(fmt.Sprintf("%v", m.DemoParsed))
builder.WriteString(", eco=") builder.WriteString(", eco=")

View File

@@ -25,8 +25,6 @@ const (
FieldMatchResult = "match_result" FieldMatchResult = "match_result"
// FieldMaxRounds holds the string denoting the max_rounds field in the database. // FieldMaxRounds holds the string denoting the max_rounds field in the database.
FieldMaxRounds = "max_rounds" FieldMaxRounds = "max_rounds"
// FieldDemoExpired holds the string denoting the demo_expired field in the database.
FieldDemoExpired = "demo_expired"
// FieldDemoParsed holds the string denoting the demo_parsed field in the database. // FieldDemoParsed holds the string denoting the demo_parsed field in the database.
FieldDemoParsed = "demo_parsed" FieldDemoParsed = "demo_parsed"
// FieldEco holds the string denoting the eco field in the database. // FieldEco holds the string denoting the eco field in the database.
@@ -63,7 +61,6 @@ var Columns = []string{
FieldDuration, FieldDuration,
FieldMatchResult, FieldMatchResult,
FieldMaxRounds, FieldMaxRounds,
FieldDemoExpired,
FieldDemoParsed, FieldDemoParsed,
FieldEco, FieldEco,
} }
@@ -85,8 +82,6 @@ func ValidColumn(column string) bool {
} }
var ( var (
// DefaultDemoExpired holds the default value on creation for the "demo_expired" field.
DefaultDemoExpired bool
// DefaultDemoParsed holds the default value on creation for the "demo_parsed" field. // DefaultDemoParsed holds the default value on creation for the "demo_parsed" field.
DefaultDemoParsed bool DefaultDemoParsed bool
) )

View File

@@ -156,13 +156,6 @@ func MaxRounds(v int) predicate.Match {
}) })
} }
// DemoExpired applies equality check predicate on the "demo_expired" field. It's identical to DemoExpiredEQ.
func DemoExpired(v bool) predicate.Match {
return predicate.Match(func(s *sql.Selector) {
s.Where(sql.EQ(s.C(FieldDemoExpired), v))
})
}
// DemoParsed applies equality check predicate on the "demo_parsed" field. It's identical to DemoParsedEQ. // DemoParsed applies equality check predicate on the "demo_parsed" field. It's identical to DemoParsedEQ.
func DemoParsed(v bool) predicate.Match { func DemoParsed(v bool) predicate.Match {
return predicate.Match(func(s *sql.Selector) { return predicate.Match(func(s *sql.Selector) {
@@ -987,20 +980,6 @@ func MaxRoundsLTE(v int) predicate.Match {
}) })
} }
// DemoExpiredEQ applies the EQ predicate on the "demo_expired" field.
func DemoExpiredEQ(v bool) predicate.Match {
return predicate.Match(func(s *sql.Selector) {
s.Where(sql.EQ(s.C(FieldDemoExpired), v))
})
}
// DemoExpiredNEQ applies the NEQ predicate on the "demo_expired" field.
func DemoExpiredNEQ(v bool) predicate.Match {
return predicate.Match(func(s *sql.Selector) {
s.Where(sql.NEQ(s.C(FieldDemoExpired), v))
})
}
// DemoParsedEQ applies the EQ predicate on the "demo_parsed" field. // DemoParsedEQ applies the EQ predicate on the "demo_parsed" field.
func DemoParsedEQ(v bool) predicate.Match { func DemoParsedEQ(v bool) predicate.Match {
return predicate.Match(func(s *sql.Selector) { return predicate.Match(func(s *sql.Selector) {

View File

@@ -92,20 +92,6 @@ func (mc *MatchCreate) SetMaxRounds(i int) *MatchCreate {
return mc return mc
} }
// SetDemoExpired sets the "demo_expired" field.
func (mc *MatchCreate) SetDemoExpired(b bool) *MatchCreate {
mc.mutation.SetDemoExpired(b)
return mc
}
// SetNillableDemoExpired sets the "demo_expired" field if the given value is not nil.
func (mc *MatchCreate) SetNillableDemoExpired(b *bool) *MatchCreate {
if b != nil {
mc.SetDemoExpired(*b)
}
return mc
}
// SetDemoParsed sets the "demo_parsed" field. // SetDemoParsed sets the "demo_parsed" field.
func (mc *MatchCreate) SetDemoParsed(b bool) *MatchCreate { func (mc *MatchCreate) SetDemoParsed(b bool) *MatchCreate {
mc.mutation.SetDemoParsed(b) mc.mutation.SetDemoParsed(b)
@@ -253,10 +239,6 @@ func (mc *MatchCreate) ExecX(ctx context.Context) {
// defaults sets the default values of the builder before save. // defaults sets the default values of the builder before save.
func (mc *MatchCreate) defaults() { func (mc *MatchCreate) defaults() {
if _, ok := mc.mutation.DemoExpired(); !ok {
v := match.DefaultDemoExpired
mc.mutation.SetDemoExpired(v)
}
if _, ok := mc.mutation.DemoParsed(); !ok { if _, ok := mc.mutation.DemoParsed(); !ok {
v := match.DefaultDemoParsed v := match.DefaultDemoParsed
mc.mutation.SetDemoParsed(v) mc.mutation.SetDemoParsed(v)
@@ -286,9 +268,6 @@ func (mc *MatchCreate) check() error {
if _, ok := mc.mutation.MaxRounds(); !ok { if _, ok := mc.mutation.MaxRounds(); !ok {
return &ValidationError{Name: "max_rounds", err: errors.New(`ent: missing required field "max_rounds"`)} return &ValidationError{Name: "max_rounds", err: errors.New(`ent: missing required field "max_rounds"`)}
} }
if _, ok := mc.mutation.DemoExpired(); !ok {
return &ValidationError{Name: "demo_expired", err: errors.New(`ent: missing required field "demo_expired"`)}
}
if _, ok := mc.mutation.DemoParsed(); !ok { if _, ok := mc.mutation.DemoParsed(); !ok {
return &ValidationError{Name: "demo_parsed", err: errors.New(`ent: missing required field "demo_parsed"`)} return &ValidationError{Name: "demo_parsed", err: errors.New(`ent: missing required field "demo_parsed"`)}
} }
@@ -397,14 +376,6 @@ func (mc *MatchCreate) createSpec() (*Match, *sqlgraph.CreateSpec) {
}) })
_node.MaxRounds = value _node.MaxRounds = value
} }
if value, ok := mc.mutation.DemoExpired(); ok {
_spec.Fields = append(_spec.Fields, &sqlgraph.FieldSpec{
Type: field.TypeBool,
Value: value,
Column: match.FieldDemoExpired,
})
_node.DemoExpired = value
}
if value, ok := mc.mutation.DemoParsed(); ok { if value, ok := mc.mutation.DemoParsed(); ok {
_spec.Fields = append(_spec.Fields, &sqlgraph.FieldSpec{ _spec.Fields = append(_spec.Fields, &sqlgraph.FieldSpec{
Type: field.TypeBool, Type: field.TypeBool,

View File

@@ -146,20 +146,6 @@ func (mu *MatchUpdate) AddMaxRounds(i int) *MatchUpdate {
return mu return mu
} }
// SetDemoExpired sets the "demo_expired" field.
func (mu *MatchUpdate) SetDemoExpired(b bool) *MatchUpdate {
mu.mutation.SetDemoExpired(b)
return mu
}
// SetNillableDemoExpired sets the "demo_expired" field if the given value is not nil.
func (mu *MatchUpdate) SetNillableDemoExpired(b *bool) *MatchUpdate {
if b != nil {
mu.SetDemoExpired(*b)
}
return mu
}
// SetDemoParsed sets the "demo_parsed" field. // SetDemoParsed sets the "demo_parsed" field.
func (mu *MatchUpdate) SetDemoParsed(b bool) *MatchUpdate { func (mu *MatchUpdate) SetDemoParsed(b bool) *MatchUpdate {
mu.mutation.SetDemoParsed(b) mu.mutation.SetDemoParsed(b)
@@ -465,13 +451,6 @@ func (mu *MatchUpdate) sqlSave(ctx context.Context) (n int, err error) {
Column: match.FieldMaxRounds, Column: match.FieldMaxRounds,
}) })
} }
if value, ok := mu.mutation.DemoExpired(); ok {
_spec.Fields.Set = append(_spec.Fields.Set, &sqlgraph.FieldSpec{
Type: field.TypeBool,
Value: value,
Column: match.FieldDemoExpired,
})
}
if value, ok := mu.mutation.DemoParsed(); ok { if value, ok := mu.mutation.DemoParsed(); ok {
_spec.Fields.Set = append(_spec.Fields.Set, &sqlgraph.FieldSpec{ _spec.Fields.Set = append(_spec.Fields.Set, &sqlgraph.FieldSpec{
Type: field.TypeBool, Type: field.TypeBool,
@@ -736,20 +715,6 @@ func (muo *MatchUpdateOne) AddMaxRounds(i int) *MatchUpdateOne {
return muo return muo
} }
// SetDemoExpired sets the "demo_expired" field.
func (muo *MatchUpdateOne) SetDemoExpired(b bool) *MatchUpdateOne {
muo.mutation.SetDemoExpired(b)
return muo
}
// SetNillableDemoExpired sets the "demo_expired" field if the given value is not nil.
func (muo *MatchUpdateOne) SetNillableDemoExpired(b *bool) *MatchUpdateOne {
if b != nil {
muo.SetDemoExpired(*b)
}
return muo
}
// SetDemoParsed sets the "demo_parsed" field. // SetDemoParsed sets the "demo_parsed" field.
func (muo *MatchUpdateOne) SetDemoParsed(b bool) *MatchUpdateOne { func (muo *MatchUpdateOne) SetDemoParsed(b bool) *MatchUpdateOne {
muo.mutation.SetDemoParsed(b) muo.mutation.SetDemoParsed(b)
@@ -1079,13 +1044,6 @@ func (muo *MatchUpdateOne) sqlSave(ctx context.Context) (_node *Match, err error
Column: match.FieldMaxRounds, Column: match.FieldMaxRounds,
}) })
} }
if value, ok := muo.mutation.DemoExpired(); ok {
_spec.Fields.Set = append(_spec.Fields.Set, &sqlgraph.FieldSpec{
Type: field.TypeBool,
Value: value,
Column: match.FieldDemoExpired,
})
}
if value, ok := muo.mutation.DemoParsed(); ok { if value, ok := muo.mutation.DemoParsed(); ok {
_spec.Fields.Set = append(_spec.Fields.Set, &sqlgraph.FieldSpec{ _spec.Fields.Set = append(_spec.Fields.Set, &sqlgraph.FieldSpec{
Type: field.TypeBool, Type: field.TypeBool,

View File

@@ -20,7 +20,6 @@ var (
{Name: "duration", Type: field.TypeInt}, {Name: "duration", Type: field.TypeInt},
{Name: "match_result", Type: field.TypeInt}, {Name: "match_result", Type: field.TypeInt},
{Name: "max_rounds", Type: field.TypeInt}, {Name: "max_rounds", Type: field.TypeInt},
{Name: "demo_expired", Type: field.TypeBool, Default: false},
{Name: "demo_parsed", Type: field.TypeBool, Default: false}, {Name: "demo_parsed", Type: field.TypeBool, Default: false},
{Name: "eco", Type: field.TypeJSON, Nullable: true}, {Name: "eco", Type: field.TypeJSON, Nullable: true},
} }

View File

@@ -49,7 +49,6 @@ type MatchMutation struct {
addmatch_result *int addmatch_result *int
max_rounds *int max_rounds *int
addmax_rounds *int addmax_rounds *int
demo_expired *bool
demo_parsed *bool demo_parsed *bool
eco *struct { eco *struct {
Rounds []*struct { Rounds []*struct {
@@ -605,42 +604,6 @@ func (m *MatchMutation) ResetMaxRounds() {
m.addmax_rounds = nil m.addmax_rounds = nil
} }
// SetDemoExpired sets the "demo_expired" field.
func (m *MatchMutation) SetDemoExpired(b bool) {
m.demo_expired = &b
}
// DemoExpired returns the value of the "demo_expired" field in the mutation.
func (m *MatchMutation) DemoExpired() (r bool, exists bool) {
v := m.demo_expired
if v == nil {
return
}
return *v, true
}
// OldDemoExpired returns the old "demo_expired" field's value of the Match entity.
// If the Match object wasn't provided to the builder, the object is fetched from the database.
// An error is returned if the mutation operation is not UpdateOne, or the database query fails.
func (m *MatchMutation) OldDemoExpired(ctx context.Context) (v bool, err error) {
if !m.op.Is(OpUpdateOne) {
return v, fmt.Errorf("OldDemoExpired is only allowed on UpdateOne operations")
}
if m.id == nil || m.oldValue == nil {
return v, fmt.Errorf("OldDemoExpired requires an ID field in the mutation")
}
oldValue, err := m.oldValue(ctx)
if err != nil {
return v, fmt.Errorf("querying old value for OldDemoExpired: %w", err)
}
return oldValue.DemoExpired, nil
}
// ResetDemoExpired resets all changes to the "demo_expired" field.
func (m *MatchMutation) ResetDemoExpired() {
m.demo_expired = nil
}
// SetDemoParsed sets the "demo_parsed" field. // SetDemoParsed sets the "demo_parsed" field.
func (m *MatchMutation) SetDemoParsed(b bool) { func (m *MatchMutation) SetDemoParsed(b bool) {
m.demo_parsed = &b m.demo_parsed = &b
@@ -871,7 +834,7 @@ func (m *MatchMutation) Type() string {
// order to get all numeric fields that were incremented/decremented, call // order to get all numeric fields that were incremented/decremented, call
// AddedFields(). // AddedFields().
func (m *MatchMutation) Fields() []string { func (m *MatchMutation) Fields() []string {
fields := make([]string, 0, 12) fields := make([]string, 0, 11)
if m.share_code != nil { if m.share_code != nil {
fields = append(fields, match.FieldShareCode) fields = append(fields, match.FieldShareCode)
} }
@@ -899,9 +862,6 @@ func (m *MatchMutation) Fields() []string {
if m.max_rounds != nil { if m.max_rounds != nil {
fields = append(fields, match.FieldMaxRounds) fields = append(fields, match.FieldMaxRounds)
} }
if m.demo_expired != nil {
fields = append(fields, match.FieldDemoExpired)
}
if m.demo_parsed != nil { if m.demo_parsed != nil {
fields = append(fields, match.FieldDemoParsed) fields = append(fields, match.FieldDemoParsed)
} }
@@ -934,8 +894,6 @@ func (m *MatchMutation) Field(name string) (ent.Value, bool) {
return m.MatchResult() return m.MatchResult()
case match.FieldMaxRounds: case match.FieldMaxRounds:
return m.MaxRounds() return m.MaxRounds()
case match.FieldDemoExpired:
return m.DemoExpired()
case match.FieldDemoParsed: case match.FieldDemoParsed:
return m.DemoParsed() return m.DemoParsed()
case match.FieldEco: case match.FieldEco:
@@ -967,8 +925,6 @@ func (m *MatchMutation) OldField(ctx context.Context, name string) (ent.Value, e
return m.OldMatchResult(ctx) return m.OldMatchResult(ctx)
case match.FieldMaxRounds: case match.FieldMaxRounds:
return m.OldMaxRounds(ctx) return m.OldMaxRounds(ctx)
case match.FieldDemoExpired:
return m.OldDemoExpired(ctx)
case match.FieldDemoParsed: case match.FieldDemoParsed:
return m.OldDemoParsed(ctx) return m.OldDemoParsed(ctx)
case match.FieldEco: case match.FieldEco:
@@ -1045,13 +1001,6 @@ func (m *MatchMutation) SetField(name string, value ent.Value) error {
} }
m.SetMaxRounds(v) m.SetMaxRounds(v)
return nil return nil
case match.FieldDemoExpired:
v, ok := value.(bool)
if !ok {
return fmt.Errorf("unexpected type %T for field %s", value, name)
}
m.SetDemoExpired(v)
return nil
case match.FieldDemoParsed: case match.FieldDemoParsed:
v, ok := value.(bool) v, ok := value.(bool)
if !ok { if !ok {
@@ -1232,9 +1181,6 @@ func (m *MatchMutation) ResetField(name string) error {
case match.FieldMaxRounds: case match.FieldMaxRounds:
m.ResetMaxRounds() m.ResetMaxRounds()
return nil return nil
case match.FieldDemoExpired:
m.ResetDemoExpired()
return nil
case match.FieldDemoParsed: case match.FieldDemoParsed:
m.ResetDemoParsed() m.ResetDemoParsed()
return nil return nil

View File

@@ -15,12 +15,8 @@ import (
func init() { func init() {
matchFields := schema.Match{}.Fields() matchFields := schema.Match{}.Fields()
_ = matchFields _ = matchFields
// matchDescDemoExpired is the schema descriptor for demo_expired field.
matchDescDemoExpired := matchFields[10].Descriptor()
// match.DefaultDemoExpired holds the default value on creation for the demo_expired field.
match.DefaultDemoExpired = matchDescDemoExpired.Default.(bool)
// matchDescDemoParsed is the schema descriptor for demo_parsed field. // matchDescDemoParsed is the schema descriptor for demo_parsed field.
matchDescDemoParsed := matchFields[11].Descriptor() matchDescDemoParsed := matchFields[10].Descriptor()
// match.DefaultDemoParsed holds the default value on creation for the demo_parsed field. // match.DefaultDemoParsed holds the default value on creation for the demo_parsed field.
match.DefaultDemoParsed = matchDescDemoParsed.Default.(bool) match.DefaultDemoParsed = matchDescDemoParsed.Default.(bool)
playerFields := schema.Player{}.Fields() playerFields := schema.Player{}.Fields()

View File

@@ -24,7 +24,6 @@ func (Match) Fields() []ent.Field {
field.Int("duration"), field.Int("duration"),
field.Int("match_result"), field.Int("match_result"),
field.Int("max_rounds"), field.Int("max_rounds"),
field.Bool("demo_expired").Default(false),
field.Bool("demo_parsed").Default(false), field.Bool("demo_parsed").Default(false),
field.JSON("eco", struct { field.JSON("eco", struct {
Rounds []*struct { Rounds []*struct {

69
main.go
View File

@@ -37,8 +37,6 @@ var (
db *utils.DBWithLock db *utils.DBWithLock
rdb *redis.Client rdb *redis.Client
rdc *cache.Cache rdc *cache.Cache
sendGC chan *csgo.Demo
demoParser = &csgo.DemoParser{}
firstHK = true firstHK = true
rL ratelimit.Limiter rL ratelimit.Limiter
configFlag = flag.String("config", "config.yaml", "Set config to use") configFlag = flag.String("config", "config.yaml", "Set config to use")
@@ -89,6 +87,7 @@ func housekeeping() {
} }
firstHK = false firstHK = false
// update players from steam
db.Lock.RLock() db.Lock.RLock()
tPlayerNeedSteamUpdate, err := db.Client.Player.Query().Where( tPlayerNeedSteamUpdate, err := db.Client.Player.Query().Where(
player.SteamUpdatedLTE(time.Now().UTC().AddDate(0, 0, -1)), player.SteamUpdatedLTE(time.Now().UTC().AddDate(0, 0, -1)),
@@ -103,6 +102,7 @@ func housekeeping() {
_, err = utils.UpdatePlayerFromSteam(tPlayer, conf.Steam.APIKey, db.Lock, rL) _, err = utils.UpdatePlayerFromSteam(tPlayer, conf.Steam.APIKey, db.Lock, rL)
} }
// getting new sharecodes
if !demoLoader.GCReady { if !demoLoader.GCReady {
log.Warningf("[HK] GC not ready, skipping sharecode refresh") log.Warningf("[HK] GC not ready, skipping sharecode refresh")
continue continue
@@ -131,11 +131,28 @@ func housekeeping() {
} }
for _, code := range shareCodes { for _, code := range shareCodes {
sendGC <- &csgo.Demo{ err := demoLoader.LoadDemo(&csgo.Demo{
ShareCode: code, ShareCode: code,
})
if err != nil {
log.Warningf("[HK] Failure to queue match: %v", err)
} }
} }
} }
// try parsing demos not parsed
tMatches, err := db.Client.Match.Query().Where(match.And(match.DateGT(time.Now().UTC().AddDate(0, 0, -30)), match.DemoParsed(false))).All(context.Background())
if err != nil {
log.Warningf("[HK] Failure getting matches to retry parsing: %v", err)
continue
}
for _, m := range tMatches {
err := demoLoader.LoadDemo(&csgo.Demo{MatchId: m.ID, ShareCode: m.ShareCode})
if err != nil {
log.Warningf("[HK] Failure trying to parse match %d: %v", m.ID, err)
}
}
} }
} }
@@ -294,7 +311,12 @@ func postPlayerTrackMe(w http.ResponseWriter, r *http.Request) {
} }
if shareCode != "" && utils.ShareCodeRegEx.MatchString(shareCode) { if shareCode != "" && utils.ShareCodeRegEx.MatchString(shareCode) {
sendGC <- &csgo.Demo{ShareCode: shareCode} err := demoLoader.LoadDemo(&csgo.Demo{ShareCode: shareCode})
if err != nil {
log.Warningf("[PPTM] unable to queue match: %v", err)
w.WriteHeader(http.StatusServiceUnavailable)
return
}
} }
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
@@ -310,8 +332,13 @@ func getMatchParse(w http.ResponseWriter, r *http.Request) {
return return
} }
sendGC <- &csgo.Demo{ err := demoLoader.LoadDemo(&csgo.Demo{
ShareCode: shareCode, ShareCode: shareCode,
})
if err != nil {
log.Warningf("[PPTM] unable to queue match: %v", err)
w.WriteHeader(http.StatusServiceUnavailable)
return
} }
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
@@ -407,13 +434,19 @@ func main() {
flag.Parse() flag.Parse()
confStr, err := os.ReadFile(*configFlag) confStr, err := os.ReadFile(*configFlag)
utils.Check(err) if err != nil {
log.Fatalf("Unable to open config: %v", err)
}
err = yaml.Unmarshal(confStr, &conf) err = yaml.Unmarshal(confStr, &conf)
utils.Check(err) if err != nil {
log.Fatalf("Unable to parse config: %v", err)
}
lvl, err := log.ParseLevel(conf.Logging.Level) lvl, err := log.ParseLevel(conf.Logging.Level)
utils.Check(err) if err != nil {
log.Fatalf("Failure setting logging level: %v", err)
}
log.SetLevel(lvl) log.SetLevel(lvl)
if *journalLogFlag { if *journalLogFlag {
journalhook.Enable() journalhook.Enable()
@@ -428,7 +461,7 @@ func main() {
log.Panicf("Failed to open database %s: %v", "opencsgo.db", err) log.Panicf("Failed to open database %s: %v", "opencsgo.db", err)
} }
defer func(dbSQLite *ent.Client) { defer func(dbSQLite *ent.Client) {
utils.Check(dbSQLite.Close()) _ = dbSQLite.Close()
}(db.Client) }(db.Client)
if err := db.Client.Schema.Create(context.Background(), migrate.WithDropIndex(true), migrate.WithDropColumn(true)); err != nil { if err := db.Client.Schema.Create(context.Background(), migrate.WithDropIndex(true), migrate.WithDropColumn(true)); err != nil {
@@ -447,8 +480,21 @@ func main() {
}) })
rL = ratelimit.New(conf.Steam.RatePerSecond) rL = ratelimit.New(conf.Steam.RatePerSecond)
// setup GC // setup GC
err = demoLoader.Setup(conf.Steam.Username, conf.Steam.Password, *authCodeFlag, conf.Steam.Sentry, conf.Steam.LoginKey, conf.Steam.ServerList) err = demoLoader.Setup(&csgo.DemoMatchLoaderConfig{
Username: conf.Steam.Username,
Password: conf.Steam.Password,
AuthCode: *authCodeFlag,
Sentry: conf.Steam.Sentry,
LoginKey: conf.Steam.LoginKey,
ServerList: conf.Steam.ServerList,
Db: db.Client,
Lock: db.Lock,
Worker: conf.Parser.Worker,
ApiKey: conf.Steam.APIKey,
RateLimit: rL,
})
if err != nil { if err != nil {
log.Fatalf("Unbale to setup DemoLoader: %v", err) log.Fatalf("Unbale to setup DemoLoader: %v", err)
} }
@@ -459,9 +505,6 @@ func main() {
} }
log.Info("GC ready, starting HTTP server") log.Info("GC ready, starting HTTP server")
sendGC = make(chan *csgo.Demo, 100)
utils.Check(demoParser.Setup(db.Client, db.Lock, conf.Parser.Worker))
go utils.GCInfoParser(sendGC, demoLoader, demoParser, db, conf.Steam.APIKey, rL)
go housekeeping() go housekeeping()
// Define routes // Define routes

View File

@@ -2,7 +2,6 @@ package utils
import ( import (
"context" "context"
"csgowtfd/csgo"
"csgowtfd/ent" "csgowtfd/ent"
"csgowtfd/ent/match" "csgowtfd/ent/match"
"csgowtfd/ent/player" "csgowtfd/ent/player"
@@ -232,7 +231,9 @@ func getNextShareCode(lastCode string, apiKey string, authCode string, steamId u
return "", fmt.Errorf("bad response from steam api (HTTP %d)", r.StatusCode) return "", fmt.Errorf("bad response from steam api (HTTP %d)", r.StatusCode)
} }
defer r.Body.Close() defer func(Body io.ReadCloser) {
_ = Body.Close()
}(r.Body)
bJson, err := ioutil.ReadAll(r.Body) bJson, err := ioutil.ReadAll(r.Body)
if err != nil { if err != nil {
return "", err return "", err
@@ -319,104 +320,6 @@ func GetPlayerFromSteamID64(db *DBWithLock, steamID uint64, apiKey string, rl ra
} }
} }
func GCInfoParser(channel chan *csgo.Demo, dl *csgo.DemoMatchLoader, dp *csgo.DemoParser, db *DBWithLock, apiKey string, rl ratelimit.Limiter) {
for {
select {
case demo := <-channel:
if !dl.GCReady {
time.Sleep(5 * time.Second)
channel <- demo
}
matchId, _, _, err := csgo.DecodeSharecode(demo.ShareCode)
Check(err)
if matchId == 0 {
log.Warningf("Can't parse match with sharecode %s", demo.ShareCode)
continue
}
db.Lock.RLock()
iMatch, err := db.Client.Match.Query().Where(match.ID(matchId)).Only(context.Background())
db.Lock.RUnlock()
if err != nil {
switch e := err.(type) {
case *ent.NotFoundError:
break
default:
Check(e)
}
} else {
if iMatch.DemoParsed == false && !iMatch.DemoExpired {
log.Infof("Match %d is loaded, but not parsed. Try parsing.", demo.MatchId)
demo.MatchId = matchId
demo.Url = iMatch.ReplayURL
dp.ParseDemo(demo)
continue
}
log.Debugf("Skipped match %d: already parsed", matchId)
continue
}
matchDetails, err := dl.GetMatchDetails(demo.ShareCode)
Check(err)
matchZero := matchDetails.GetMatches()[0]
lastRound := matchZero.GetRoundstatsall()[len(matchZero.Roundstatsall)-1]
var players []*ent.Player
for _, accountId := range lastRound.GetReservation().GetAccountIds() {
tPlayer, err := GetPlayer(db, csgo.AccountId2SteamId(accountId), apiKey, rl)
Check(err)
players = append(players, tPlayer)
}
demo.Url = lastRound.GetMap()
demo.MatchId = matchZero.GetMatchid()
db.Lock.Lock()
tMatch, err := db.Client.Match.Create().
SetID(matchZero.GetMatchid()).
AddPlayers(players...).
SetDate(time.Unix(int64(matchZero.GetMatchtime()), 0).UTC()).
SetMaxRounds(int(lastRound.GetMaxRounds())).
SetDuration(int(lastRound.GetMatchDuration())).
SetShareCode(demo.ShareCode).
SetReplayURL(lastRound.GetMap()).
SetScoreTeamA(int(lastRound.GetTeamScores()[0])).
SetScoreTeamB(int(lastRound.GetTeamScores()[1])).
SetMatchResult(int(lastRound.GetMatchResult())).
Save(context.Background())
db.Lock.Unlock()
Check(err)
for i, mPlayer := range players {
var teamId int
if i > 4 {
teamId = 2
} else {
teamId = 1
}
db.Lock.Lock()
err := db.Client.Stats.Create().
SetMatches(tMatch).
SetPlayers(mPlayer).
SetTeamID(teamId).
SetKills(int(lastRound.GetKills()[i])).
SetDeaths(int(lastRound.GetDeaths()[i])).
SetAssists(int(lastRound.GetAssists()[i])).
SetMvp(int(lastRound.GetMvps()[i])).
SetScore(int(lastRound.GetScores()[i])).
SetHeadshot(int(lastRound.GetEnemyHeadshots()[i])).
Exec(context.Background())
db.Lock.Unlock()
Check(err)
}
dp.ParseDemo(demo)
}
}
}
func SteamProfile2XML(id string, steamID64 uint64) (*CommunityXML, error) { func SteamProfile2XML(id string, steamID64 uint64) (*CommunityXML, error) {
var r *http.Response var r *http.Response
var err error var err error
@@ -425,10 +328,11 @@ func SteamProfile2XML(id string, steamID64 uint64) (*CommunityXML, error) {
} else { } else {
r, err = http.Get(fmt.Sprintf(steamVanityURLEntry, id)) r, err = http.Get(fmt.Sprintf(steamVanityURLEntry, id))
} }
Check(err) if err != nil {
return nil, err
}
defer func(Body io.ReadCloser) { defer func(Body io.ReadCloser) {
err := Body.Close() _ = Body.Close()
Check(err)
}(r.Body) }(r.Body)
body, err := ioutil.ReadAll(r.Body) body, err := ioutil.ReadAll(r.Body)
@@ -480,9 +384,3 @@ func UpdatePlayerFromSteam(player *ent.Player, apiKey string, lock *sync.RWMutex
return tPlayer, nil return tPlayer, nil
} }
func Check(e error) {
if e != nil {
panic(e)
}
}