Files
upload-baker/main.go

251 lines
7.0 KiB
Go

package main
import (
"errors"
"flag"
"fmt"
"github.com/go-ping/ping"
log "github.com/sirupsen/logrus"
"github.com/wercker/journalhook"
"gopkg.in/yaml.v2"
"net"
"os"
"os/exec"
"os/signal"
"regexp"
"strconv"
"syscall"
"time"
)
type Conf struct {
Bandwidth struct {
Max float64 `yaml:"max"`
Min float64 `yaml:"min"`
Step float64 `yaml:"step"`
} `yaml:"bandwidth"`
Interval int `yaml:"interval"`
Host string `yaml:"host"`
PPP int `yaml:"ppp"`
ConformationPPP int `yaml:"conformation_ppp"`
LogLevel string `yaml:"log_level"`
UploadInterface string `yaml:"upload_interface"`
ThrottlePingThreshold int64 `yaml:"throttle_ping_threshold"`
TryRestoringAfter int `yaml:"try_restoring_after"`
}
var (
conf *Conf
journalLog = flag.Bool("journal", false, "Log to systemd journal instead of stdout")
configFlag = flag.String("config", "config.yaml", "Set config file to use")
reBandwidth = regexp.MustCompile(`(?m)bandwidth\s(\d+)Mbit`)
ErrorNoResponse = errors.New("no response")
)
type UploadManager struct {
LastOver time.Time
}
func (m UploadManager) Upload() (float64, error) {
tcCmd := exec.Command("tc", "qdisc", "show", "dev", conf.UploadInterface) //nolint:gosec
out, err := tcCmd.CombinedOutput()
if err != nil {
return 0, err
}
if reBandwidth.MatchString(string(out)) { //nolint:mirror
up, err := strconv.Atoi(reBandwidth.FindAllStringSubmatch(string(out), -1)[0][1])
if err != nil {
return 0, err
}
return float64(up), nil
}
return 0, fmt.Errorf("could not parse bandwidth from tc")
}
func (m UploadManager) SetUpload(upload float64) (float64, error) {
tcCmd := exec.Command("tc", "qdisc", "change", "dev", conf.UploadInterface, "root", "cake", "bandwidth", //nolint:gosec
fmt.Sprintf("%dMbit", int(upload)))
log.Debugf("[TC] exec %s", tcCmd.String())
out, err := tcCmd.CombinedOutput()
if err != nil {
log.Debugf("[TC] executing %s failed with: %v (%s)", tcCmd.String(), err, out)
return 0, err
}
return upload, nil
}
func doPing(host string, count int, interval time.Duration) (*ping.Statistics, error) {
log.Debugf("[PING] starting to ping %s with %d packages", host, count)
pinger, err := ping.NewPinger(host)
if err != nil {
return nil, err
}
pinger.SetPrivileged(true)
pinger.Timeout = time.Second * time.Duration(count) * 2
pinger.Count = count
pinger.Interval = interval
// get interface
nif, err := net.InterfaceByName(conf.UploadInterface)
if err != nil {
return nil, err
}
addrs, err := nif.Addrs()
if err != nil {
return nil, err
}
if len(addrs) == 0 {
return nil, fmt.Errorf("interface %s has no addressed assigned", conf.UploadInterface)
}
for _, addr := range addrs {
if addr.(*net.IPNet).IP.To4() == nil {
continue
}
pinger.Source = addr.(*net.IPNet).IP.String()
break
}
if pinger.Source == "" {
return nil, fmt.Errorf("interface %s has no suitable address assigned", conf.UploadInterface)
}
err = pinger.Run()
if err != nil {
return nil, err
}
log.Debugf("[PING] %s: %s±%s %d%% loss", host, pinger.Statistics().AvgRtt, pinger.Statistics().StdDevRtt,
int(pinger.Statistics().PacketLoss))
if pinger.Statistics().PacketLoss == 100 { //nolint:gomnd
return nil, ErrorNoResponse
}
return pinger.Statistics(), nil
}
func (m UploadManager) isBWInRange(bw float64) bool {
return bw <= conf.Bandwidth.Max && bw >= conf.Bandwidth.Min
}
func (m UploadManager) pingWorker() error {
outerLoop:
for {
stats, err := doPing(conf.Host, conf.PPP, time.Second)
if err != nil {
log.Warningf("ping to %s failed: %v", conf.Host, err)
time.Sleep(time.Duration(conf.Interval) * time.Second)
continue
}
if stats.AvgRtt.Milliseconds() >= conf.ThrottlePingThreshold || stats.StdDevRtt.Milliseconds() > conf.ThrottlePingThreshold/2 {
m.LastOver = time.Now()
lastStep := conf.Bandwidth.Step * 0.5
up, err := m.Upload()
if err != nil {
return err
}
log.Infof("ping %s±%s over threshold detected, starting extended pings...", stats.AvgRtt, stats.StdDevRtt)
for stats.AvgRtt.Milliseconds() >= conf.ThrottlePingThreshold || stats.StdDevRtt.Milliseconds() > conf.ThrottlePingThreshold/2 {
stats, err = doPing(conf.Host, conf.ConformationPPP, time.Second)
if err != nil {
log.Warningf("ping to %s failed: %v", conf.Host, err)
time.Sleep(time.Duration(conf.Interval) * time.Minute)
continue outerLoop
}
if stats.AvgRtt.Milliseconds() >= conf.ThrottlePingThreshold || stats.StdDevRtt.Milliseconds() > conf.ThrottlePingThreshold/2 {
lastStep *= 2
newUp := up - lastStep
if !m.isBWInRange(newUp) {
newUp = conf.Bandwidth.Min
}
log.Infof("extended ping %s±%s, adjusting upload TC %f->%f", stats.AvgRtt, stats.StdDevRtt, up, newUp)
up, err = m.SetUpload(newUp)
if err != nil {
return err
}
m.LastOver = time.Now()
if newUp == conf.Bandwidth.Min {
log.Infof("reached lower limit, stopping extended pings")
break
}
}
time.Sleep(5 * time.Second)
}
} else if time.Since(m.LastOver) >= time.Duration(conf.TryRestoringAfter)*time.Minute {
up, err := m.Upload()
if err != nil {
return err
}
if m.isBWInRange(up + conf.Bandwidth.Step) {
log.Infof("short ping %s±%s, trying to increase upload TC %f->%f", stats.AvgRtt, stats.StdDevRtt, up, up+conf.Bandwidth.Step)
up, err = m.SetUpload(up + conf.Bandwidth.Step)
if err != nil {
return err
}
m.LastOver = time.Now()
log.Infof("upload TC increased to %f, measuring latency", up)
stats, err = doPing(conf.Host, conf.ConformationPPP, time.Second)
if err != nil {
log.Warningf("ping to %s failed: %v", conf.Host, err)
if !errors.Is(err, ErrorNoResponse) {
time.Sleep(time.Duration(conf.Interval) * time.Minute)
}
continue
}
if stats.AvgRtt.Milliseconds() >= conf.ThrottlePingThreshold || stats.StdDevRtt.Milliseconds() > conf.ThrottlePingThreshold/2 {
log.Infof("increase failed with %s ping, reverting to %f", stats.AvgRtt, up-conf.Bandwidth.Step)
_, err = m.SetUpload(up - conf.Bandwidth.Step)
if err != nil {
return err
}
} else {
log.Infof("extended ping %s±%s seems stable", stats.AvgRtt, stats.StdDevRtt)
}
}
}
time.Sleep(time.Duration(conf.Interval) * time.Second)
}
}
func main() {
flag.Parse()
killSignals := make(chan os.Signal, 1)
signal.Notify(killSignals, syscall.SIGINT, syscall.SIGTERM)
confStr, err := os.ReadFile(*configFlag)
if err != nil {
log.Fatalf("Error reading config file: %v", err)
}
err = yaml.Unmarshal(confStr, &conf)
if err != nil {
log.Fatalf("Error parsing config file: %v", err)
}
lvl, err := log.ParseLevel(conf.LogLevel)
if err != nil {
log.Fatalf("Error parsing log level from config: %v", err)
}
log.SetLevel(lvl)
if *journalLog {
journalhook.Enable()
}
um := new(UploadManager)
go func() {
err := um.pingWorker()
if err != nil {
log.Fatalf("Error in ping-loop: %v", err)
}
}()
for range killSignals {
break
}
}