diff --git a/csgo/demo_loader.go b/csgo/demo_loader.go index 5665afc..df33207 100644 --- a/csgo/demo_loader.go +++ b/csgo/demo_loader.go @@ -12,7 +12,7 @@ import ( "github.com/an0nfunc/go-steam/v3/protocol/steamlang" "github.com/go-redis/cache/v8" log "github.com/sirupsen/logrus" - "go.uber.org/ratelimit" + "golang.org/x/time/rate" "google.golang.org/protobuf/proto" "io/ioutil" "os" @@ -37,7 +37,7 @@ type DemoMatchLoaderConfig struct { Db *ent.Client Worker int ApiKey string - RateLimit ratelimit.Limiter + RateLimit *rate.Limiter Cache *cache.Cache SprayTimeout int RetryTimeout int @@ -370,7 +370,7 @@ func (dml *DemoMatchLoader) demoWorker() { } } -func (dml *DemoMatchLoader) handleDemo(demo *Demo, apiKey string, rl ratelimit.Limiter) error { +func (dml *DemoMatchLoader) handleDemo(demo *Demo, apiKey string, rl *rate.Limiter) error { if dml.IsLoading(demo) { log.Infof("[DL] Skipping %s: parsing in progress", demo.ShareCode) return nil @@ -549,7 +549,7 @@ func (dml *DemoMatchLoader) handleDemo(demo *Demo, apiKey string, rl ratelimit.L return nil } -func (dml *DemoMatchLoader) gcWorker(apiKey string, rl ratelimit.Limiter) { +func (dml *DemoMatchLoader) gcWorker(apiKey string, rl *rate.Limiter) { for demo := range dml.parseDemo { err := dml.handleDemo(demo, apiKey, rl) if err != nil { diff --git a/go.mod b/go.mod index 54bab1a..fb16aec 100644 --- a/go.mod +++ b/go.mod @@ -15,8 +15,8 @@ require ( github.com/markus-wa/demoinfocs-golang/v2 v2.13.0 github.com/sirupsen/logrus v1.8.1 github.com/wercker/journalhook v0.0.0-20180428041537-5d0a5ae867b3 - go.uber.org/ratelimit v0.2.0 golang.org/x/text v0.3.7 + golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 google.golang.org/protobuf v1.28.0 gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b ) diff --git a/go.sum b/go.sum index c72157d..055378e 100644 --- a/go.sum +++ b/go.sum @@ -955,6 +955,8 @@ golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 h1:M73Iuj3xbbb9Uk1DYhzydthsj6oOd6l9bpuFcNoUvTs= +golang.org/x/time v0.0.0-20220224211638-0e9765cccd65/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/main.go b/main.go index c57fa31..7573fff 100644 --- a/main.go +++ b/main.go @@ -24,8 +24,8 @@ import ( "github.com/markus-wa/demoinfocs-golang/v2/pkg/demoinfocs/common" log "github.com/sirupsen/logrus" "github.com/wercker/journalhook" - "go.uber.org/ratelimit" "golang.org/x/text/language" + "golang.org/x/time/rate" "gopkg.in/yaml.v3" "net" "net/http" @@ -45,7 +45,7 @@ var ( db *ent.Client rdb *redis.Client rdc *cache.Cache - rL ratelimit.Limiter + rL *rate.Limiter configFlag = flag.String("config", "config.yaml", "Set config file to use") journalLogFlag = flag.Bool("journal", false, "Log to systemd journal instead of stdout") sqlDebugFlag = flag.Bool("sqldebug", false, "Debug SQL queries") @@ -1150,7 +1150,7 @@ func main() { LocalCache: cache.NewTinyLFU(1000, time.Minute), }) - rL = ratelimit.New(conf.Steam.RatePerSecond) + rL = rate.NewLimiter(rate.Limit(conf.Steam.RatePerSecond), 100) // setup GC err = demoLoader.Setup(&csgo.DemoMatchLoaderConfig{ diff --git a/utils/utils.go b/utils/utils.go index 3656342..10a01bc 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -15,9 +15,10 @@ import ( "git.harting.dev/csgowtf/csgowtfd/ent/weapon" "github.com/an0nfunc/go-steamapi" log "github.com/sirupsen/logrus" - "go.uber.org/ratelimit" + "golang.org/x/time/rate" "io" "io/ioutil" + "math" "net/http" "net/url" "path" @@ -42,9 +43,9 @@ type Conf struct { Steam struct { Username string Password string - AuthCode string `yaml:"auth_code"` - APIKey string `yaml:"api_key"` - RatePerSecond int `yaml:"rate_per_sec"` + AuthCode string `yaml:"auth_code"` + APIKey string `yaml:"api_key"` + RatePerSecond float64 `yaml:"rate_per_sec"` Sentry string LoginKey string `yaml:"login_key"` MaxRetryWait int `yaml:"max_retry_wait"` @@ -494,7 +495,7 @@ func GetWinLossTieForPlayer(dbPlayer *ent.Player) (wins int, looses int, ties in return } -func IsAuthCodeValid(player *ent.Player, apiKey string, shareCode string, authCode string, rl ratelimit.Limiter) (bool, error) { +func IsAuthCodeValid(player *ent.Player, apiKey string, shareCode string, authCode string, rl *rate.Limiter) (bool, error) { var tMatch *ent.Match var err error if shareCode == "" { @@ -517,7 +518,7 @@ func IsAuthCodeValid(player *ent.Player, apiKey string, shareCode string, authCo } } -func GetNewShareCodesForPlayer(player *ent.Player, apiKey string, rl ratelimit.Limiter) ([]string, error) { +func GetNewShareCodesForPlayer(player *ent.Player, apiKey string, rl *rate.Limiter) ([]string, error) { latestMatch, err := player.QueryMatches().Order(ent.Desc(match.FieldDate)).First(context.Background()) if err != nil { return nil, err @@ -555,13 +556,16 @@ func GetNewShareCodesForPlayer(player *ent.Player, apiKey string, rl ratelimit.L return rCodes, nil } -func getNextShareCode(lastCode string, apiKey string, authCode string, steamId uint64, rl ratelimit.Limiter) (string, error) { +func getNextShareCode(lastCode string, apiKey string, authCode string, steamId uint64, rl *rate.Limiter) (string, error) { if lastCode == "" || apiKey == "" || authCode == "" || steamId == 0 { return "", fmt.Errorf("invalid arguments") } if rl != nil { - rl.Take() + err := rl.Wait(context.Background()) + if err != nil { + return "", err + } } log.Debugf("[SC] STEAMPI with %s", fmt.Sprintf(shareCodeURLEntry, "REDACTED", steamId, "REDACTED", lastCode)) r, err := http.Get(fmt.Sprintf(shareCodeURLEntry, apiKey, steamId, authCode, lastCode)) @@ -603,7 +607,7 @@ func getNextShareCode(lastCode string, apiKey string, authCode string, steamId u return rJson.Result.Code, nil } -func Player(db *ent.Client, id interface{}, apiKey string, rl ratelimit.Limiter) (*ent.Player, error) { +func Player(db *ent.Client, id interface{}, apiKey string, rl *rate.Limiter) (*ent.Player, error) { switch e := id.(type) { case uint64: return PlayerFromSteamID64(db, e, apiKey, rl) @@ -623,7 +627,7 @@ func Player(db *ent.Client, id interface{}, apiKey string, rl ratelimit.Limiter) } } -func PlayerFromVanityURL(db *ent.Client, id string, apiKey string, rl ratelimit.Limiter) (*ent.Player, error) { +func PlayerFromVanityURL(db *ent.Client, id string, apiKey string, rl *rate.Limiter) (*ent.Player, error) { if id == "" { return nil, fmt.Errorf("invalid arguments") } @@ -633,7 +637,10 @@ func PlayerFromVanityURL(db *ent.Client, id string, apiKey string, rl ratelimit. return tPlayer, nil } else { if rl != nil { - rl.Take() + err := rl.Wait(context.Background()) + if err != nil { + return nil, err + } } resp, err := steamapi.ResolveVanityURL(id, apiKey) if err != nil { @@ -653,7 +660,7 @@ func PlayerFromVanityURL(db *ent.Client, id string, apiKey string, rl ratelimit. } } -func PlayerFromSteamID64(db *ent.Client, steamID uint64, apiKey string, rl ratelimit.Limiter) (*ent.Player, error) { +func PlayerFromSteamID64(db *ent.Client, steamID uint64, apiKey string, rl *rate.Limiter) (*ent.Player, error) { tPlayer, err := db.Player.Get(context.Background(), steamID) if err == nil { return tPlayer, nil @@ -718,15 +725,20 @@ func TranslateWithDeepL(text string, language string, baseURL string, apiKey str } } -func PlayerFromSteam(players []*ent.Player, db *ent.Client, apiKey string, rl ratelimit.Limiter) ([]*ent.Player, error) { +func PlayerFromSteam(players []*ent.Player, db *ent.Client, apiKey string, rl *rate.Limiter) ([]*ent.Player, error) { var idsToUpdate []uint64 for _, updatePlayer := range players { idsToUpdate = append(idsToUpdate, updatePlayer.ID) } + batches := int(math.Round((float64(len(players)) / 100) + 0.5)) + if rl != nil { - rl.Take() + err := rl.WaitN(context.Background(), batches) + if err != nil { + return nil, err + } } playerSum, err := steamapi.GetPlayerSummaries(idsToUpdate, apiKey) if err != nil { @@ -771,7 +783,10 @@ func PlayerFromSteam(players []*ent.Player, db *ent.Client, apiKey string, rl ra } if rl != nil { - rl.Take() + err := rl.WaitN(context.Background(), batches) + if err != nil { + return nil, err + } } bans, err := steamapi.GetPlayerBans(idsToUpdate, apiKey) if err != nil {