switched to rate.Limiter

This commit is contained in:
2022-03-30 15:32:25 +02:00
parent ac512aec2f
commit acbfb97014
5 changed files with 40 additions and 23 deletions

View File

@@ -12,7 +12,7 @@ import (
"github.com/an0nfunc/go-steam/v3/protocol/steamlang"
"github.com/go-redis/cache/v8"
log "github.com/sirupsen/logrus"
"go.uber.org/ratelimit"
"golang.org/x/time/rate"
"google.golang.org/protobuf/proto"
"io/ioutil"
"os"
@@ -37,7 +37,7 @@ type DemoMatchLoaderConfig struct {
Db *ent.Client
Worker int
ApiKey string
RateLimit ratelimit.Limiter
RateLimit *rate.Limiter
Cache *cache.Cache
SprayTimeout int
RetryTimeout int
@@ -370,7 +370,7 @@ func (dml *DemoMatchLoader) demoWorker() {
}
}
func (dml *DemoMatchLoader) handleDemo(demo *Demo, apiKey string, rl ratelimit.Limiter) error {
func (dml *DemoMatchLoader) handleDemo(demo *Demo, apiKey string, rl *rate.Limiter) error {
if dml.IsLoading(demo) {
log.Infof("[DL] Skipping %s: parsing in progress", demo.ShareCode)
return nil
@@ -549,7 +549,7 @@ func (dml *DemoMatchLoader) handleDemo(demo *Demo, apiKey string, rl ratelimit.L
return nil
}
func (dml *DemoMatchLoader) gcWorker(apiKey string, rl ratelimit.Limiter) {
func (dml *DemoMatchLoader) gcWorker(apiKey string, rl *rate.Limiter) {
for demo := range dml.parseDemo {
err := dml.handleDemo(demo, apiKey, rl)
if err != nil {

2
go.mod
View File

@@ -15,8 +15,8 @@ require (
github.com/markus-wa/demoinfocs-golang/v2 v2.13.0
github.com/sirupsen/logrus v1.8.1
github.com/wercker/journalhook v0.0.0-20180428041537-5d0a5ae867b3
go.uber.org/ratelimit v0.2.0
golang.org/x/text v0.3.7
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65
google.golang.org/protobuf v1.28.0
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
)

2
go.sum
View File

@@ -955,6 +955,8 @@ golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 h1:M73Iuj3xbbb9Uk1DYhzydthsj6oOd6l9bpuFcNoUvTs=
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

View File

@@ -24,8 +24,8 @@ import (
"github.com/markus-wa/demoinfocs-golang/v2/pkg/demoinfocs/common"
log "github.com/sirupsen/logrus"
"github.com/wercker/journalhook"
"go.uber.org/ratelimit"
"golang.org/x/text/language"
"golang.org/x/time/rate"
"gopkg.in/yaml.v3"
"net"
"net/http"
@@ -45,7 +45,7 @@ var (
db *ent.Client
rdb *redis.Client
rdc *cache.Cache
rL ratelimit.Limiter
rL *rate.Limiter
configFlag = flag.String("config", "config.yaml", "Set config file to use")
journalLogFlag = flag.Bool("journal", false, "Log to systemd journal instead of stdout")
sqlDebugFlag = flag.Bool("sqldebug", false, "Debug SQL queries")
@@ -1150,7 +1150,7 @@ func main() {
LocalCache: cache.NewTinyLFU(1000, time.Minute),
})
rL = ratelimit.New(conf.Steam.RatePerSecond)
rL = rate.NewLimiter(rate.Limit(conf.Steam.RatePerSecond), 100)
// setup GC
err = demoLoader.Setup(&csgo.DemoMatchLoaderConfig{

View File

@@ -15,9 +15,10 @@ import (
"git.harting.dev/csgowtf/csgowtfd/ent/weapon"
"github.com/an0nfunc/go-steamapi"
log "github.com/sirupsen/logrus"
"go.uber.org/ratelimit"
"golang.org/x/time/rate"
"io"
"io/ioutil"
"math"
"net/http"
"net/url"
"path"
@@ -42,9 +43,9 @@ type Conf struct {
Steam struct {
Username string
Password string
AuthCode string `yaml:"auth_code"`
APIKey string `yaml:"api_key"`
RatePerSecond int `yaml:"rate_per_sec"`
AuthCode string `yaml:"auth_code"`
APIKey string `yaml:"api_key"`
RatePerSecond float64 `yaml:"rate_per_sec"`
Sentry string
LoginKey string `yaml:"login_key"`
MaxRetryWait int `yaml:"max_retry_wait"`
@@ -494,7 +495,7 @@ func GetWinLossTieForPlayer(dbPlayer *ent.Player) (wins int, looses int, ties in
return
}
func IsAuthCodeValid(player *ent.Player, apiKey string, shareCode string, authCode string, rl ratelimit.Limiter) (bool, error) {
func IsAuthCodeValid(player *ent.Player, apiKey string, shareCode string, authCode string, rl *rate.Limiter) (bool, error) {
var tMatch *ent.Match
var err error
if shareCode == "" {
@@ -517,7 +518,7 @@ func IsAuthCodeValid(player *ent.Player, apiKey string, shareCode string, authCo
}
}
func GetNewShareCodesForPlayer(player *ent.Player, apiKey string, rl ratelimit.Limiter) ([]string, error) {
func GetNewShareCodesForPlayer(player *ent.Player, apiKey string, rl *rate.Limiter) ([]string, error) {
latestMatch, err := player.QueryMatches().Order(ent.Desc(match.FieldDate)).First(context.Background())
if err != nil {
return nil, err
@@ -555,13 +556,16 @@ func GetNewShareCodesForPlayer(player *ent.Player, apiKey string, rl ratelimit.L
return rCodes, nil
}
func getNextShareCode(lastCode string, apiKey string, authCode string, steamId uint64, rl ratelimit.Limiter) (string, error) {
func getNextShareCode(lastCode string, apiKey string, authCode string, steamId uint64, rl *rate.Limiter) (string, error) {
if lastCode == "" || apiKey == "" || authCode == "" || steamId == 0 {
return "", fmt.Errorf("invalid arguments")
}
if rl != nil {
rl.Take()
err := rl.Wait(context.Background())
if err != nil {
return "", err
}
}
log.Debugf("[SC] STEAMPI with %s", fmt.Sprintf(shareCodeURLEntry, "REDACTED", steamId, "REDACTED", lastCode))
r, err := http.Get(fmt.Sprintf(shareCodeURLEntry, apiKey, steamId, authCode, lastCode))
@@ -603,7 +607,7 @@ func getNextShareCode(lastCode string, apiKey string, authCode string, steamId u
return rJson.Result.Code, nil
}
func Player(db *ent.Client, id interface{}, apiKey string, rl ratelimit.Limiter) (*ent.Player, error) {
func Player(db *ent.Client, id interface{}, apiKey string, rl *rate.Limiter) (*ent.Player, error) {
switch e := id.(type) {
case uint64:
return PlayerFromSteamID64(db, e, apiKey, rl)
@@ -623,7 +627,7 @@ func Player(db *ent.Client, id interface{}, apiKey string, rl ratelimit.Limiter)
}
}
func PlayerFromVanityURL(db *ent.Client, id string, apiKey string, rl ratelimit.Limiter) (*ent.Player, error) {
func PlayerFromVanityURL(db *ent.Client, id string, apiKey string, rl *rate.Limiter) (*ent.Player, error) {
if id == "" {
return nil, fmt.Errorf("invalid arguments")
}
@@ -633,7 +637,10 @@ func PlayerFromVanityURL(db *ent.Client, id string, apiKey string, rl ratelimit.
return tPlayer, nil
} else {
if rl != nil {
rl.Take()
err := rl.Wait(context.Background())
if err != nil {
return nil, err
}
}
resp, err := steamapi.ResolveVanityURL(id, apiKey)
if err != nil {
@@ -653,7 +660,7 @@ func PlayerFromVanityURL(db *ent.Client, id string, apiKey string, rl ratelimit.
}
}
func PlayerFromSteamID64(db *ent.Client, steamID uint64, apiKey string, rl ratelimit.Limiter) (*ent.Player, error) {
func PlayerFromSteamID64(db *ent.Client, steamID uint64, apiKey string, rl *rate.Limiter) (*ent.Player, error) {
tPlayer, err := db.Player.Get(context.Background(), steamID)
if err == nil {
return tPlayer, nil
@@ -718,15 +725,20 @@ func TranslateWithDeepL(text string, language string, baseURL string, apiKey str
}
}
func PlayerFromSteam(players []*ent.Player, db *ent.Client, apiKey string, rl ratelimit.Limiter) ([]*ent.Player, error) {
func PlayerFromSteam(players []*ent.Player, db *ent.Client, apiKey string, rl *rate.Limiter) ([]*ent.Player, error) {
var idsToUpdate []uint64
for _, updatePlayer := range players {
idsToUpdate = append(idsToUpdate, updatePlayer.ID)
}
batches := int(math.Round((float64(len(players)) / 100) + 0.5))
if rl != nil {
rl.Take()
err := rl.WaitN(context.Background(), batches)
if err != nil {
return nil, err
}
}
playerSum, err := steamapi.GetPlayerSummaries(idsToUpdate, apiKey)
if err != nil {
@@ -771,7 +783,10 @@ func PlayerFromSteam(players []*ent.Player, db *ent.Client, apiKey string, rl ra
}
if rl != nil {
rl.Take()
err := rl.WaitN(context.Background(), batches)
if err != nil {
return nil, err
}
}
bans, err := steamapi.GetPlayerBans(idsToUpdate, apiKey)
if err != nil {