diff --git a/csgo/demo_loader.go b/csgo/demo_loader.go index 9c1fed7..c3517ba 100644 --- a/csgo/demo_loader.go +++ b/csgo/demo_loader.go @@ -1,6 +1,10 @@ package csgo import ( + "context" + "csgowtfd/ent" + "csgowtfd/ent/match" + "csgowtfd/utils" "encoding/json" "fmt" "github.com/an0nfunc/go-steam/v3" @@ -9,10 +13,12 @@ import ( "github.com/an0nfunc/go-steam/v3/protocol/gamecoordinator" "github.com/an0nfunc/go-steam/v3/protocol/steamlang" log "github.com/sirupsen/logrus" + "go.uber.org/ratelimit" "google.golang.org/protobuf/proto" "io/ioutil" "math/rand" "os" + "sync" "time" ) @@ -21,6 +27,20 @@ const ( APPID = 730 ) +type DemoMatchLoaderConfig struct { + Username string + Password string + AuthCode string + Sentry string + LoginKey string + ServerList string + Db *ent.Client + Lock *sync.RWMutex + Worker int + ApiKey string + RateLimit ratelimit.Limiter +} + type DemoMatchLoader struct { client *steam.Client GCReady bool @@ -30,6 +50,10 @@ type DemoMatchLoader struct { sentryFile string loginKey string serverList string + db *ent.Client + lock *sync.RWMutex + dp *DemoParser + parseDemo chan *Demo } func AccountId2SteamId(accId uint32) uint64 { @@ -82,7 +106,7 @@ func (d *DemoMatchLoader) HandleGCPacket(pkg *gamecoordinator.GCPacket) { } } -func (d *DemoMatchLoader) GetMatchDetails(sharecode string) (*protobuf.CMsgGCCStrike15V2_MatchList, error) { +func (d *DemoMatchLoader) getMatchDetails(sharecode string) (*protobuf.CMsgGCCStrike15V2_MatchList, error) { if !d.GCReady { return nil, fmt.Errorf("gc not ready") } @@ -91,18 +115,18 @@ func (d *DemoMatchLoader) GetMatchDetails(sharecode string) (*protobuf.CMsgGCCSt if err != nil { return nil, err } - err = d.RequestDemoInfo(matchId, outcomeId, tokenId) + err = d.requestDemoInfo(matchId, outcomeId, tokenId) if err != nil { return nil, err } for { select { - case match := <-d.matchRecv: - if *match.Matches[0].Matchid == matchId { - return match, nil + case matchDetails := <-d.matchRecv: + if *matchDetails.Matches[0].Matchid == matchId { + return matchDetails, nil } else { - d.matchRecv <- match + d.matchRecv <- matchDetails } } } @@ -127,15 +151,15 @@ func (d *DemoMatchLoader) connectToSteam() error { return nil } -func (d *DemoMatchLoader) Setup(username string, password string, authCode string, sentry string, loginKey string, serverList string) error { - d.loginKey = loginKey - d.sentryFile = sentry - d.serverList = serverList +func (d *DemoMatchLoader) Setup(config *DemoMatchLoaderConfig) error { + d.loginKey = config.LoginKey + d.sentryFile = config.Sentry + d.serverList = config.ServerList d.steamLogin = new(steam.LogOnDetails) - d.steamLogin.Username = username - d.steamLogin.Password = password - d.steamLogin.AuthCode = authCode + d.steamLogin.Username = config.Username + d.steamLogin.Password = config.Password + d.steamLogin.AuthCode = config.AuthCode d.steamLogin.ShouldRememberPassword = true if _, err := os.Stat(d.sentryFile); err == nil { @@ -166,14 +190,28 @@ func (d *DemoMatchLoader) Setup(username string, password string, authCode strin } d.client = steam.NewClient() - d.matchRecv = make(chan *protobuf.CMsgGCCStrike15V2_MatchList, 500) + d.matchRecv = make(chan *protobuf.CMsgGCCStrike15V2_MatchList, 1000) + d.parseDemo = make(chan *Demo, 1000) go d.connectLoop() go d.steamEventHandler() + for i := 0; i < config.Worker; i++ { + go d.gcWorker(config.ApiKey, config.RateLimit) + } + return nil } +func (d DemoMatchLoader) LoadDemo(demo *Demo) error { + select { + case d.parseDemo <- demo: + return nil + default: + return fmt.Errorf("queue full") + } +} + func (d DemoMatchLoader) connectLoop() { for d.connectToSteam() != nil { log.Infof("Retrying connecting to steam") @@ -207,15 +245,15 @@ func (d *DemoMatchLoader) steamEventHandler() { case *steam.LoggedOnEvent: log.Debug("[DL] Login successfully!") d.client.Social.SetPersonaState(steamlang.EPersonaState_Online) - go d.SetPlaying() + go d.setPlaying() case *steam.LogOnFailedEvent: log.Warningf("[DL] Steam login denied: %+v", e) switch e.Result { case steamlang.EResult_AccountLogonDenied: log.Fatalf("[DL] Please provide AuthCode with --authcode") case steamlang.EResult_InvalidPassword: - os.Remove(d.sentryFile) - os.Remove(d.loginKey) + _ = os.Remove(d.sentryFile) + _ = os.Remove(d.loginKey) log.Fatalf("[DL] Steam login wrong") case steamlang.EResult_InvalidLoginAuthCode: log.Fatalf("[DL] Steam auth code wrong") @@ -242,7 +280,7 @@ func (d *DemoMatchLoader) steamEventHandler() { } } -func (d *DemoMatchLoader) SetPlaying() { +func (d *DemoMatchLoader) setPlaying() { d.client.GC.SetGamesPlayed(APPID) d.client.GC.RegisterPacketHandler(d) go d.greetGC() @@ -257,7 +295,7 @@ func (d *DemoMatchLoader) greetGC() { } } -func (d *DemoMatchLoader) RequestDemoInfo(matchId uint64, conclusionId uint64, tokenId uint32) error { +func (d *DemoMatchLoader) requestDemoInfo(matchId uint64, conclusionId uint64, tokenId uint32) error { if !d.GCReady { return fmt.Errorf("gc not ready") } @@ -270,3 +308,121 @@ func (d *DemoMatchLoader) RequestDemoInfo(matchId uint64, conclusionId uint64, t return nil } + +func (d *DemoMatchLoader) gcWorker(apiKey string, rl ratelimit.Limiter) { + for { + select { + case demo := <-d.parseDemo: + if !d.GCReady { + time.Sleep(5 * time.Second) + d.parseDemo <- demo + continue + } + + matchId, _, _, err := DecodeSharecode(demo.ShareCode) + if err != nil || matchId == 0 { + log.Warningf("[DL] Can't parse match with sharecode %s: %v", demo.ShareCode, err) + continue + } + + d.lock.RLock() + iMatch, err := d.db.Match.Query().Where(match.ID(matchId)).Only(context.Background()) + d.lock.RUnlock() + if err != nil { + switch e := err.(type) { + case *ent.NotFoundError: + break + default: + log.Errorf("[DL] Failure trying to find match %d in db: %v", matchId, e) + } + } else { + if iMatch.DemoParsed == false && iMatch.Date.Before(time.Now().UTC().AddDate(0, 0, -30)) { + log.Infof("[DL] Match %d is loaded, but not parsed. Try parsing.", demo.MatchId) + demo.MatchId = matchId + demo.Url = iMatch.ReplayURL + err := d.dp.ParseDemo(demo) + if err != nil { + log.Warningf("[DL] Parsing demo from match %d failed: %v", demo.MatchId, err) + } + continue + } + + log.Debugf("[DL] Skipped match %d: already parsed", matchId) + continue + } + + matchDetails, err := d.getMatchDetails(demo.ShareCode) + if err != nil { + log.Warningf("[DL] Failure to get match-details for %d from GC: %v", demo.MatchId, err) + continue + } + matchZero := matchDetails.GetMatches()[0] + lastRound := matchZero.GetRoundstatsall()[len(matchZero.Roundstatsall)-1] + var players []*ent.Player + + for _, accountId := range lastRound.GetReservation().GetAccountIds() { + tPlayer, err := utils.GetPlayer(&utils.DBWithLock{ + Client: d.db, + Lock: d.lock, + }, AccountId2SteamId(accountId), apiKey, rl) + if err != nil { + log.Warningf("[DL] Unable to get player for steamid %d: %v", AccountId2SteamId(accountId), err) + continue + } + players = append(players, tPlayer) + } + + demo.Url = lastRound.GetMap() + demo.MatchId = matchZero.GetMatchid() + + d.lock.Lock() + tMatch, err := d.db.Match.Create(). + SetID(matchZero.GetMatchid()). + AddPlayers(players...). + SetDate(time.Unix(int64(matchZero.GetMatchtime()), 0).UTC()). + SetMaxRounds(int(lastRound.GetMaxRounds())). + SetDuration(int(lastRound.GetMatchDuration())). + SetShareCode(demo.ShareCode). + SetReplayURL(lastRound.GetMap()). + SetScoreTeamA(int(lastRound.GetTeamScores()[0])). + SetScoreTeamB(int(lastRound.GetTeamScores()[1])). + SetMatchResult(int(lastRound.GetMatchResult())). + Save(context.Background()) + d.lock.Unlock() + if err != nil { + log.Warningf("[DL] Unable to create match %d: %v", matchZero.GetMatchid(), err) + continue + } + + for i, mPlayer := range players { + var teamId int + if i > 4 { + teamId = 2 + } else { + teamId = 1 + } + d.lock.Lock() + err := d.db.Stats.Create(). + SetMatches(tMatch). + SetPlayers(mPlayer). + SetTeamID(teamId). + SetKills(int(lastRound.GetKills()[i])). + SetDeaths(int(lastRound.GetDeaths()[i])). + SetAssists(int(lastRound.GetAssists()[i])). + SetMvp(int(lastRound.GetMvps()[i])). + SetScore(int(lastRound.GetScores()[i])). + SetHeadshot(int(lastRound.GetEnemyHeadshots()[i])). + Exec(context.Background()) + d.lock.Unlock() + if err != nil { + log.Warningf("[DL] Unable to create stats for player %d in match %d: %v", mPlayer.ID, tMatch.ID, err) + } + } + + err = d.dp.ParseDemo(demo) + if err != nil { + log.Warningf("[DL] Can't queue demo %d for parsing: %v", demo.MatchId, err) + } + } + } +} diff --git a/csgo/demo_parser.go b/csgo/demo_parser.go index 0b25dbd..60994f9 100644 --- a/csgo/demo_parser.go +++ b/csgo/demo_parser.go @@ -21,9 +21,6 @@ type Demo struct { ShareCode string MatchId uint64 Url string - Rank int - Tickrate int - File string } type DemoParser struct { @@ -128,14 +125,7 @@ func (p *DemoParser) parseWorker() { if err != nil { switch e := err.(type) { case DemoNotFoundError: - p.lock.Lock() - err := tMatch.Update().SetDemoExpired(true).Exec(context.Background()) - p.lock.Unlock() - if err != nil { - log.Errorf("[DP] Unable to set demo expire for match %d: %v", demo.MatchId, e) - continue - } - log.Warningf("[DP] Demo already expired for %d", demo.MatchId) + log.Warningf("[DP] Demo not found for %d. Maybe temp.?", demo.MatchId) continue default: log.Warningf("[DP] Unable to download demo for %d: %v", demo.MatchId, e) diff --git a/ent/match.go b/ent/match.go index ac4559f..b5dd5e1 100644 --- a/ent/match.go +++ b/ent/match.go @@ -35,8 +35,6 @@ type Match struct { MatchResult int `json:"match_result,omitempty"` // MaxRounds holds the value of the "max_rounds" field. MaxRounds int `json:"max_rounds,omitempty"` - // DemoExpired holds the value of the "demo_expired" field. - DemoExpired bool `json:"demo_expired,omitempty"` // DemoParsed holds the value of the "demo_parsed" field. DemoParsed bool `json:"demo_parsed,omitempty"` // Eco holds the value of the "eco" field. @@ -88,7 +86,7 @@ func (*Match) scanValues(columns []string) ([]interface{}, error) { switch columns[i] { case match.FieldEco: values[i] = new([]byte) - case match.FieldDemoExpired, match.FieldDemoParsed: + case match.FieldDemoParsed: values[i] = new(sql.NullBool) case match.FieldID, match.FieldScoreTeamA, match.FieldScoreTeamB, match.FieldDuration, match.FieldMatchResult, match.FieldMaxRounds: values[i] = new(sql.NullInt64) @@ -171,12 +169,6 @@ func (m *Match) assignValues(columns []string, values []interface{}) error { } else if value.Valid { m.MaxRounds = int(value.Int64) } - case match.FieldDemoExpired: - if value, ok := values[i].(*sql.NullBool); !ok { - return fmt.Errorf("unexpected type %T for field demo_expired", values[i]) - } else if value.Valid { - m.DemoExpired = value.Bool - } case match.FieldDemoParsed: if value, ok := values[i].(*sql.NullBool); !ok { return fmt.Errorf("unexpected type %T for field demo_parsed", values[i]) @@ -247,8 +239,6 @@ func (m *Match) String() string { builder.WriteString(fmt.Sprintf("%v", m.MatchResult)) builder.WriteString(", max_rounds=") builder.WriteString(fmt.Sprintf("%v", m.MaxRounds)) - builder.WriteString(", demo_expired=") - builder.WriteString(fmt.Sprintf("%v", m.DemoExpired)) builder.WriteString(", demo_parsed=") builder.WriteString(fmt.Sprintf("%v", m.DemoParsed)) builder.WriteString(", eco=") diff --git a/ent/match/match.go b/ent/match/match.go index 01134f4..75582ca 100644 --- a/ent/match/match.go +++ b/ent/match/match.go @@ -25,8 +25,6 @@ const ( FieldMatchResult = "match_result" // FieldMaxRounds holds the string denoting the max_rounds field in the database. FieldMaxRounds = "max_rounds" - // FieldDemoExpired holds the string denoting the demo_expired field in the database. - FieldDemoExpired = "demo_expired" // FieldDemoParsed holds the string denoting the demo_parsed field in the database. FieldDemoParsed = "demo_parsed" // FieldEco holds the string denoting the eco field in the database. @@ -63,7 +61,6 @@ var Columns = []string{ FieldDuration, FieldMatchResult, FieldMaxRounds, - FieldDemoExpired, FieldDemoParsed, FieldEco, } @@ -85,8 +82,6 @@ func ValidColumn(column string) bool { } var ( - // DefaultDemoExpired holds the default value on creation for the "demo_expired" field. - DefaultDemoExpired bool // DefaultDemoParsed holds the default value on creation for the "demo_parsed" field. DefaultDemoParsed bool ) diff --git a/ent/match/where.go b/ent/match/where.go index 2a34c1e..65791cd 100644 --- a/ent/match/where.go +++ b/ent/match/where.go @@ -156,13 +156,6 @@ func MaxRounds(v int) predicate.Match { }) } -// DemoExpired applies equality check predicate on the "demo_expired" field. It's identical to DemoExpiredEQ. -func DemoExpired(v bool) predicate.Match { - return predicate.Match(func(s *sql.Selector) { - s.Where(sql.EQ(s.C(FieldDemoExpired), v)) - }) -} - // DemoParsed applies equality check predicate on the "demo_parsed" field. It's identical to DemoParsedEQ. func DemoParsed(v bool) predicate.Match { return predicate.Match(func(s *sql.Selector) { @@ -987,20 +980,6 @@ func MaxRoundsLTE(v int) predicate.Match { }) } -// DemoExpiredEQ applies the EQ predicate on the "demo_expired" field. -func DemoExpiredEQ(v bool) predicate.Match { - return predicate.Match(func(s *sql.Selector) { - s.Where(sql.EQ(s.C(FieldDemoExpired), v)) - }) -} - -// DemoExpiredNEQ applies the NEQ predicate on the "demo_expired" field. -func DemoExpiredNEQ(v bool) predicate.Match { - return predicate.Match(func(s *sql.Selector) { - s.Where(sql.NEQ(s.C(FieldDemoExpired), v)) - }) -} - // DemoParsedEQ applies the EQ predicate on the "demo_parsed" field. func DemoParsedEQ(v bool) predicate.Match { return predicate.Match(func(s *sql.Selector) { diff --git a/ent/match_create.go b/ent/match_create.go index d161a52..b779a32 100644 --- a/ent/match_create.go +++ b/ent/match_create.go @@ -92,20 +92,6 @@ func (mc *MatchCreate) SetMaxRounds(i int) *MatchCreate { return mc } -// SetDemoExpired sets the "demo_expired" field. -func (mc *MatchCreate) SetDemoExpired(b bool) *MatchCreate { - mc.mutation.SetDemoExpired(b) - return mc -} - -// SetNillableDemoExpired sets the "demo_expired" field if the given value is not nil. -func (mc *MatchCreate) SetNillableDemoExpired(b *bool) *MatchCreate { - if b != nil { - mc.SetDemoExpired(*b) - } - return mc -} - // SetDemoParsed sets the "demo_parsed" field. func (mc *MatchCreate) SetDemoParsed(b bool) *MatchCreate { mc.mutation.SetDemoParsed(b) @@ -253,10 +239,6 @@ func (mc *MatchCreate) ExecX(ctx context.Context) { // defaults sets the default values of the builder before save. func (mc *MatchCreate) defaults() { - if _, ok := mc.mutation.DemoExpired(); !ok { - v := match.DefaultDemoExpired - mc.mutation.SetDemoExpired(v) - } if _, ok := mc.mutation.DemoParsed(); !ok { v := match.DefaultDemoParsed mc.mutation.SetDemoParsed(v) @@ -286,9 +268,6 @@ func (mc *MatchCreate) check() error { if _, ok := mc.mutation.MaxRounds(); !ok { return &ValidationError{Name: "max_rounds", err: errors.New(`ent: missing required field "max_rounds"`)} } - if _, ok := mc.mutation.DemoExpired(); !ok { - return &ValidationError{Name: "demo_expired", err: errors.New(`ent: missing required field "demo_expired"`)} - } if _, ok := mc.mutation.DemoParsed(); !ok { return &ValidationError{Name: "demo_parsed", err: errors.New(`ent: missing required field "demo_parsed"`)} } @@ -397,14 +376,6 @@ func (mc *MatchCreate) createSpec() (*Match, *sqlgraph.CreateSpec) { }) _node.MaxRounds = value } - if value, ok := mc.mutation.DemoExpired(); ok { - _spec.Fields = append(_spec.Fields, &sqlgraph.FieldSpec{ - Type: field.TypeBool, - Value: value, - Column: match.FieldDemoExpired, - }) - _node.DemoExpired = value - } if value, ok := mc.mutation.DemoParsed(); ok { _spec.Fields = append(_spec.Fields, &sqlgraph.FieldSpec{ Type: field.TypeBool, diff --git a/ent/match_update.go b/ent/match_update.go index 68af549..f074291 100644 --- a/ent/match_update.go +++ b/ent/match_update.go @@ -146,20 +146,6 @@ func (mu *MatchUpdate) AddMaxRounds(i int) *MatchUpdate { return mu } -// SetDemoExpired sets the "demo_expired" field. -func (mu *MatchUpdate) SetDemoExpired(b bool) *MatchUpdate { - mu.mutation.SetDemoExpired(b) - return mu -} - -// SetNillableDemoExpired sets the "demo_expired" field if the given value is not nil. -func (mu *MatchUpdate) SetNillableDemoExpired(b *bool) *MatchUpdate { - if b != nil { - mu.SetDemoExpired(*b) - } - return mu -} - // SetDemoParsed sets the "demo_parsed" field. func (mu *MatchUpdate) SetDemoParsed(b bool) *MatchUpdate { mu.mutation.SetDemoParsed(b) @@ -465,13 +451,6 @@ func (mu *MatchUpdate) sqlSave(ctx context.Context) (n int, err error) { Column: match.FieldMaxRounds, }) } - if value, ok := mu.mutation.DemoExpired(); ok { - _spec.Fields.Set = append(_spec.Fields.Set, &sqlgraph.FieldSpec{ - Type: field.TypeBool, - Value: value, - Column: match.FieldDemoExpired, - }) - } if value, ok := mu.mutation.DemoParsed(); ok { _spec.Fields.Set = append(_spec.Fields.Set, &sqlgraph.FieldSpec{ Type: field.TypeBool, @@ -736,20 +715,6 @@ func (muo *MatchUpdateOne) AddMaxRounds(i int) *MatchUpdateOne { return muo } -// SetDemoExpired sets the "demo_expired" field. -func (muo *MatchUpdateOne) SetDemoExpired(b bool) *MatchUpdateOne { - muo.mutation.SetDemoExpired(b) - return muo -} - -// SetNillableDemoExpired sets the "demo_expired" field if the given value is not nil. -func (muo *MatchUpdateOne) SetNillableDemoExpired(b *bool) *MatchUpdateOne { - if b != nil { - muo.SetDemoExpired(*b) - } - return muo -} - // SetDemoParsed sets the "demo_parsed" field. func (muo *MatchUpdateOne) SetDemoParsed(b bool) *MatchUpdateOne { muo.mutation.SetDemoParsed(b) @@ -1079,13 +1044,6 @@ func (muo *MatchUpdateOne) sqlSave(ctx context.Context) (_node *Match, err error Column: match.FieldMaxRounds, }) } - if value, ok := muo.mutation.DemoExpired(); ok { - _spec.Fields.Set = append(_spec.Fields.Set, &sqlgraph.FieldSpec{ - Type: field.TypeBool, - Value: value, - Column: match.FieldDemoExpired, - }) - } if value, ok := muo.mutation.DemoParsed(); ok { _spec.Fields.Set = append(_spec.Fields.Set, &sqlgraph.FieldSpec{ Type: field.TypeBool, diff --git a/ent/migrate/schema.go b/ent/migrate/schema.go index 114fa0b..38ec227 100644 --- a/ent/migrate/schema.go +++ b/ent/migrate/schema.go @@ -20,7 +20,6 @@ var ( {Name: "duration", Type: field.TypeInt}, {Name: "match_result", Type: field.TypeInt}, {Name: "max_rounds", Type: field.TypeInt}, - {Name: "demo_expired", Type: field.TypeBool, Default: false}, {Name: "demo_parsed", Type: field.TypeBool, Default: false}, {Name: "eco", Type: field.TypeJSON, Nullable: true}, } diff --git a/ent/mutation.go b/ent/mutation.go index 8b199ef..0ddc64d 100644 --- a/ent/mutation.go +++ b/ent/mutation.go @@ -49,7 +49,6 @@ type MatchMutation struct { addmatch_result *int max_rounds *int addmax_rounds *int - demo_expired *bool demo_parsed *bool eco *struct { Rounds []*struct { @@ -605,42 +604,6 @@ func (m *MatchMutation) ResetMaxRounds() { m.addmax_rounds = nil } -// SetDemoExpired sets the "demo_expired" field. -func (m *MatchMutation) SetDemoExpired(b bool) { - m.demo_expired = &b -} - -// DemoExpired returns the value of the "demo_expired" field in the mutation. -func (m *MatchMutation) DemoExpired() (r bool, exists bool) { - v := m.demo_expired - if v == nil { - return - } - return *v, true -} - -// OldDemoExpired returns the old "demo_expired" field's value of the Match entity. -// If the Match object wasn't provided to the builder, the object is fetched from the database. -// An error is returned if the mutation operation is not UpdateOne, or the database query fails. -func (m *MatchMutation) OldDemoExpired(ctx context.Context) (v bool, err error) { - if !m.op.Is(OpUpdateOne) { - return v, fmt.Errorf("OldDemoExpired is only allowed on UpdateOne operations") - } - if m.id == nil || m.oldValue == nil { - return v, fmt.Errorf("OldDemoExpired requires an ID field in the mutation") - } - oldValue, err := m.oldValue(ctx) - if err != nil { - return v, fmt.Errorf("querying old value for OldDemoExpired: %w", err) - } - return oldValue.DemoExpired, nil -} - -// ResetDemoExpired resets all changes to the "demo_expired" field. -func (m *MatchMutation) ResetDemoExpired() { - m.demo_expired = nil -} - // SetDemoParsed sets the "demo_parsed" field. func (m *MatchMutation) SetDemoParsed(b bool) { m.demo_parsed = &b @@ -871,7 +834,7 @@ func (m *MatchMutation) Type() string { // order to get all numeric fields that were incremented/decremented, call // AddedFields(). func (m *MatchMutation) Fields() []string { - fields := make([]string, 0, 12) + fields := make([]string, 0, 11) if m.share_code != nil { fields = append(fields, match.FieldShareCode) } @@ -899,9 +862,6 @@ func (m *MatchMutation) Fields() []string { if m.max_rounds != nil { fields = append(fields, match.FieldMaxRounds) } - if m.demo_expired != nil { - fields = append(fields, match.FieldDemoExpired) - } if m.demo_parsed != nil { fields = append(fields, match.FieldDemoParsed) } @@ -934,8 +894,6 @@ func (m *MatchMutation) Field(name string) (ent.Value, bool) { return m.MatchResult() case match.FieldMaxRounds: return m.MaxRounds() - case match.FieldDemoExpired: - return m.DemoExpired() case match.FieldDemoParsed: return m.DemoParsed() case match.FieldEco: @@ -967,8 +925,6 @@ func (m *MatchMutation) OldField(ctx context.Context, name string) (ent.Value, e return m.OldMatchResult(ctx) case match.FieldMaxRounds: return m.OldMaxRounds(ctx) - case match.FieldDemoExpired: - return m.OldDemoExpired(ctx) case match.FieldDemoParsed: return m.OldDemoParsed(ctx) case match.FieldEco: @@ -1045,13 +1001,6 @@ func (m *MatchMutation) SetField(name string, value ent.Value) error { } m.SetMaxRounds(v) return nil - case match.FieldDemoExpired: - v, ok := value.(bool) - if !ok { - return fmt.Errorf("unexpected type %T for field %s", value, name) - } - m.SetDemoExpired(v) - return nil case match.FieldDemoParsed: v, ok := value.(bool) if !ok { @@ -1232,9 +1181,6 @@ func (m *MatchMutation) ResetField(name string) error { case match.FieldMaxRounds: m.ResetMaxRounds() return nil - case match.FieldDemoExpired: - m.ResetDemoExpired() - return nil case match.FieldDemoParsed: m.ResetDemoParsed() return nil diff --git a/ent/runtime.go b/ent/runtime.go index bfb8aa7..3ebb57d 100644 --- a/ent/runtime.go +++ b/ent/runtime.go @@ -15,12 +15,8 @@ import ( func init() { matchFields := schema.Match{}.Fields() _ = matchFields - // matchDescDemoExpired is the schema descriptor for demo_expired field. - matchDescDemoExpired := matchFields[10].Descriptor() - // match.DefaultDemoExpired holds the default value on creation for the demo_expired field. - match.DefaultDemoExpired = matchDescDemoExpired.Default.(bool) // matchDescDemoParsed is the schema descriptor for demo_parsed field. - matchDescDemoParsed := matchFields[11].Descriptor() + matchDescDemoParsed := matchFields[10].Descriptor() // match.DefaultDemoParsed holds the default value on creation for the demo_parsed field. match.DefaultDemoParsed = matchDescDemoParsed.Default.(bool) playerFields := schema.Player{}.Fields() diff --git a/ent/schema/match.go b/ent/schema/match.go index b73232f..c4ae46f 100644 --- a/ent/schema/match.go +++ b/ent/schema/match.go @@ -24,7 +24,6 @@ func (Match) Fields() []ent.Field { field.Int("duration"), field.Int("match_result"), field.Int("max_rounds"), - field.Bool("demo_expired").Default(false), field.Bool("demo_parsed").Default(false), field.JSON("eco", struct { Rounds []*struct { diff --git a/main.go b/main.go index ba065ee..013c6a0 100644 --- a/main.go +++ b/main.go @@ -37,8 +37,6 @@ var ( db *utils.DBWithLock rdb *redis.Client rdc *cache.Cache - sendGC chan *csgo.Demo - demoParser = &csgo.DemoParser{} firstHK = true rL ratelimit.Limiter configFlag = flag.String("config", "config.yaml", "Set config to use") @@ -89,6 +87,7 @@ func housekeeping() { } firstHK = false + // update players from steam db.Lock.RLock() tPlayerNeedSteamUpdate, err := db.Client.Player.Query().Where( player.SteamUpdatedLTE(time.Now().UTC().AddDate(0, 0, -1)), @@ -103,6 +102,7 @@ func housekeeping() { _, err = utils.UpdatePlayerFromSteam(tPlayer, conf.Steam.APIKey, db.Lock, rL) } + // getting new sharecodes if !demoLoader.GCReady { log.Warningf("[HK] GC not ready, skipping sharecode refresh") continue @@ -131,11 +131,28 @@ func housekeeping() { } for _, code := range shareCodes { - sendGC <- &csgo.Demo{ + err := demoLoader.LoadDemo(&csgo.Demo{ ShareCode: code, + }) + if err != nil { + log.Warningf("[HK] Failure to queue match: %v", err) } } } + + // try parsing demos not parsed + tMatches, err := db.Client.Match.Query().Where(match.And(match.DateGT(time.Now().UTC().AddDate(0, 0, -30)), match.DemoParsed(false))).All(context.Background()) + if err != nil { + log.Warningf("[HK] Failure getting matches to retry parsing: %v", err) + continue + } + + for _, m := range tMatches { + err := demoLoader.LoadDemo(&csgo.Demo{MatchId: m.ID, ShareCode: m.ShareCode}) + if err != nil { + log.Warningf("[HK] Failure trying to parse match %d: %v", m.ID, err) + } + } } } @@ -294,7 +311,12 @@ func postPlayerTrackMe(w http.ResponseWriter, r *http.Request) { } if shareCode != "" && utils.ShareCodeRegEx.MatchString(shareCode) { - sendGC <- &csgo.Demo{ShareCode: shareCode} + err := demoLoader.LoadDemo(&csgo.Demo{ShareCode: shareCode}) + if err != nil { + log.Warningf("[PPTM] unable to queue match: %v", err) + w.WriteHeader(http.StatusServiceUnavailable) + return + } } w.WriteHeader(http.StatusOK) @@ -310,8 +332,13 @@ func getMatchParse(w http.ResponseWriter, r *http.Request) { return } - sendGC <- &csgo.Demo{ + err := demoLoader.LoadDemo(&csgo.Demo{ ShareCode: shareCode, + }) + if err != nil { + log.Warningf("[PPTM] unable to queue match: %v", err) + w.WriteHeader(http.StatusServiceUnavailable) + return } w.WriteHeader(http.StatusOK) @@ -407,13 +434,19 @@ func main() { flag.Parse() confStr, err := os.ReadFile(*configFlag) - utils.Check(err) + if err != nil { + log.Fatalf("Unable to open config: %v", err) + } err = yaml.Unmarshal(confStr, &conf) - utils.Check(err) + if err != nil { + log.Fatalf("Unable to parse config: %v", err) + } lvl, err := log.ParseLevel(conf.Logging.Level) - utils.Check(err) + if err != nil { + log.Fatalf("Failure setting logging level: %v", err) + } log.SetLevel(lvl) if *journalLogFlag { journalhook.Enable() @@ -428,7 +461,7 @@ func main() { log.Panicf("Failed to open database %s: %v", "opencsgo.db", err) } defer func(dbSQLite *ent.Client) { - utils.Check(dbSQLite.Close()) + _ = dbSQLite.Close() }(db.Client) if err := db.Client.Schema.Create(context.Background(), migrate.WithDropIndex(true), migrate.WithDropColumn(true)); err != nil { @@ -447,8 +480,21 @@ func main() { }) rL = ratelimit.New(conf.Steam.RatePerSecond) + // setup GC - err = demoLoader.Setup(conf.Steam.Username, conf.Steam.Password, *authCodeFlag, conf.Steam.Sentry, conf.Steam.LoginKey, conf.Steam.ServerList) + err = demoLoader.Setup(&csgo.DemoMatchLoaderConfig{ + Username: conf.Steam.Username, + Password: conf.Steam.Password, + AuthCode: *authCodeFlag, + Sentry: conf.Steam.Sentry, + LoginKey: conf.Steam.LoginKey, + ServerList: conf.Steam.ServerList, + Db: db.Client, + Lock: db.Lock, + Worker: conf.Parser.Worker, + ApiKey: conf.Steam.APIKey, + RateLimit: rL, + }) if err != nil { log.Fatalf("Unbale to setup DemoLoader: %v", err) } @@ -459,9 +505,6 @@ func main() { } log.Info("GC ready, starting HTTP server") - sendGC = make(chan *csgo.Demo, 100) - utils.Check(demoParser.Setup(db.Client, db.Lock, conf.Parser.Worker)) - go utils.GCInfoParser(sendGC, demoLoader, demoParser, db, conf.Steam.APIKey, rL) go housekeeping() // Define routes diff --git a/utils/utils.go b/utils/utils.go index 3adef07..363c405 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -2,7 +2,6 @@ package utils import ( "context" - "csgowtfd/csgo" "csgowtfd/ent" "csgowtfd/ent/match" "csgowtfd/ent/player" @@ -232,7 +231,9 @@ func getNextShareCode(lastCode string, apiKey string, authCode string, steamId u return "", fmt.Errorf("bad response from steam api (HTTP %d)", r.StatusCode) } - defer r.Body.Close() + defer func(Body io.ReadCloser) { + _ = Body.Close() + }(r.Body) bJson, err := ioutil.ReadAll(r.Body) if err != nil { return "", err @@ -319,104 +320,6 @@ func GetPlayerFromSteamID64(db *DBWithLock, steamID uint64, apiKey string, rl ra } } -func GCInfoParser(channel chan *csgo.Demo, dl *csgo.DemoMatchLoader, dp *csgo.DemoParser, db *DBWithLock, apiKey string, rl ratelimit.Limiter) { - for { - select { - case demo := <-channel: - if !dl.GCReady { - time.Sleep(5 * time.Second) - channel <- demo - } - - matchId, _, _, err := csgo.DecodeSharecode(demo.ShareCode) - Check(err) - if matchId == 0 { - log.Warningf("Can't parse match with sharecode %s", demo.ShareCode) - continue - } - - db.Lock.RLock() - iMatch, err := db.Client.Match.Query().Where(match.ID(matchId)).Only(context.Background()) - db.Lock.RUnlock() - if err != nil { - switch e := err.(type) { - case *ent.NotFoundError: - break - default: - Check(e) - } - } else { - if iMatch.DemoParsed == false && !iMatch.DemoExpired { - log.Infof("Match %d is loaded, but not parsed. Try parsing.", demo.MatchId) - demo.MatchId = matchId - demo.Url = iMatch.ReplayURL - dp.ParseDemo(demo) - continue - } - - log.Debugf("Skipped match %d: already parsed", matchId) - continue - } - - matchDetails, err := dl.GetMatchDetails(demo.ShareCode) - Check(err) - matchZero := matchDetails.GetMatches()[0] - lastRound := matchZero.GetRoundstatsall()[len(matchZero.Roundstatsall)-1] - var players []*ent.Player - - for _, accountId := range lastRound.GetReservation().GetAccountIds() { - tPlayer, err := GetPlayer(db, csgo.AccountId2SteamId(accountId), apiKey, rl) - Check(err) - players = append(players, tPlayer) - } - - demo.Url = lastRound.GetMap() - demo.MatchId = matchZero.GetMatchid() - - db.Lock.Lock() - tMatch, err := db.Client.Match.Create(). - SetID(matchZero.GetMatchid()). - AddPlayers(players...). - SetDate(time.Unix(int64(matchZero.GetMatchtime()), 0).UTC()). - SetMaxRounds(int(lastRound.GetMaxRounds())). - SetDuration(int(lastRound.GetMatchDuration())). - SetShareCode(demo.ShareCode). - SetReplayURL(lastRound.GetMap()). - SetScoreTeamA(int(lastRound.GetTeamScores()[0])). - SetScoreTeamB(int(lastRound.GetTeamScores()[1])). - SetMatchResult(int(lastRound.GetMatchResult())). - Save(context.Background()) - db.Lock.Unlock() - Check(err) - - for i, mPlayer := range players { - var teamId int - if i > 4 { - teamId = 2 - } else { - teamId = 1 - } - db.Lock.Lock() - err := db.Client.Stats.Create(). - SetMatches(tMatch). - SetPlayers(mPlayer). - SetTeamID(teamId). - SetKills(int(lastRound.GetKills()[i])). - SetDeaths(int(lastRound.GetDeaths()[i])). - SetAssists(int(lastRound.GetAssists()[i])). - SetMvp(int(lastRound.GetMvps()[i])). - SetScore(int(lastRound.GetScores()[i])). - SetHeadshot(int(lastRound.GetEnemyHeadshots()[i])). - Exec(context.Background()) - db.Lock.Unlock() - Check(err) - } - - dp.ParseDemo(demo) - } - } -} - func SteamProfile2XML(id string, steamID64 uint64) (*CommunityXML, error) { var r *http.Response var err error @@ -425,10 +328,11 @@ func SteamProfile2XML(id string, steamID64 uint64) (*CommunityXML, error) { } else { r, err = http.Get(fmt.Sprintf(steamVanityURLEntry, id)) } - Check(err) + if err != nil { + return nil, err + } defer func(Body io.ReadCloser) { - err := Body.Close() - Check(err) + _ = Body.Close() }(r.Body) body, err := ioutil.ReadAll(r.Body) @@ -480,9 +384,3 @@ func UpdatePlayerFromSteam(player *ent.Player, apiKey string, lock *sync.RWMutex return tPlayer, nil } - -func Check(e error) { - if e != nil { - panic(e) - } -}