package main import ( "errors" "flag" "fmt" "github.com/go-ping/ping" log "github.com/sirupsen/logrus" "github.com/wercker/journalhook" "gopkg.in/yaml.v2" "net" "os" "os/exec" "os/signal" "regexp" "strconv" "syscall" "time" ) type Conf struct { Bandwidth struct { Max float64 `yaml:"max"` Min float64 `yaml:"min"` Step float64 `yaml:"step"` } `yaml:"bandwidth"` Interval int `yaml:"interval"` Host string `yaml:"host"` PPP int `yaml:"ppp"` ConformationPPP int `yaml:"conformation_ppp"` LogLevel string `yaml:"log_level"` UploadInterface string `yaml:"upload_interface"` ThrottlePingThreshold int64 `yaml:"throttle_ping_threshold"` TryRestoringAfter int `yaml:"try_restoring_after"` } var ( conf *Conf journalLog = flag.Bool("journal", false, "Log to systemd journal instead of stdout") configFlag = flag.String("config", "config.yaml", "Set config file to use") reBandwidth = regexp.MustCompile(`(?m)bandwidth\s(\d+)Mbit`) ErrorNoResponse = errors.New("no response") ) type UploadManager struct { LastOver time.Time } func (m UploadManager) Upload() (float64, error) { tcCmd := exec.Command("tc", "qdisc", "show", "dev", conf.UploadInterface) //nolint:gosec out, err := tcCmd.CombinedOutput() if err != nil { return 0, err } if reBandwidth.MatchString(string(out)) { //nolint:mirror up, err := strconv.Atoi(reBandwidth.FindAllStringSubmatch(string(out), -1)[0][1]) if err != nil { return 0, err } return float64(up), nil } return 0, fmt.Errorf("could not parse bandwidth from tc") } func (m UploadManager) SetUpload(upload float64) (float64, error) { tcCmd := exec.Command("tc", "qdisc", "change", "dev", conf.UploadInterface, "root", "cake", "bandwidth", //nolint:gosec fmt.Sprintf("%dMbit", int(upload))) log.Debugf("[TC] exec %s", tcCmd.String()) out, err := tcCmd.CombinedOutput() if err != nil { log.Debugf("[TC] executing %s failed with: %v (%s)", tcCmd.String(), err, out) return 0, err } return upload, nil } func doPing(host string, count int, interval time.Duration) (*ping.Statistics, error) { log.Debugf("[PING] starting to ping %s with %d packages", host, count) pinger, err := ping.NewPinger(host) if err != nil { return nil, err } pinger.SetPrivileged(true) pinger.Timeout = time.Second * time.Duration(count) * 2 pinger.Count = count pinger.Interval = interval // get interface nif, err := net.InterfaceByName(conf.UploadInterface) if err != nil { return nil, err } addrs, err := nif.Addrs() if err != nil { return nil, err } if len(addrs) == 0 { return nil, fmt.Errorf("interface %s has no addressed assigned", conf.UploadInterface) } for _, addr := range addrs { if addr.(*net.IPNet).IP.To4() == nil { continue } pinger.Source = addr.(*net.IPNet).IP.String() break } if pinger.Source == "" { return nil, fmt.Errorf("interface %s has no suitable address assigned", conf.UploadInterface) } err = pinger.Run() if err != nil { return nil, err } log.Debugf("[PING] %s: %s±%s %d%% loss", host, pinger.Statistics().AvgRtt, pinger.Statistics().StdDevRtt, int(pinger.Statistics().PacketLoss)) if pinger.Statistics().PacketLoss == 100 { //nolint:gomnd return nil, ErrorNoResponse } return pinger.Statistics(), nil } func (m UploadManager) isBWInRange(bw float64) bool { return bw <= conf.Bandwidth.Max && bw >= conf.Bandwidth.Min } func (m UploadManager) pingWorker() error { outerLoop: for { stats, err := doPing(conf.Host, conf.PPP, time.Second) if err != nil { log.Warningf("ping to %s failed: %v", conf.Host, err) time.Sleep(time.Duration(conf.Interval) * time.Second) continue } if stats.AvgRtt.Milliseconds() >= conf.ThrottlePingThreshold || stats.StdDevRtt.Milliseconds() > conf.ThrottlePingThreshold/2 { m.LastOver = time.Now() lastStep := conf.Bandwidth.Step * 0.5 up, err := m.Upload() if err != nil { return err } log.Infof("ping %s±%s over threshold detected, starting extended pings...", stats.AvgRtt, stats.StdDevRtt) for stats.AvgRtt.Milliseconds() >= conf.ThrottlePingThreshold || stats.StdDevRtt.Milliseconds() > conf.ThrottlePingThreshold/2 { stats, err = doPing(conf.Host, conf.ConformationPPP, time.Second) if err != nil { log.Warningf("ping to %s failed: %v", conf.Host, err) time.Sleep(time.Duration(conf.Interval) * time.Minute) continue outerLoop } if stats.AvgRtt.Milliseconds() >= conf.ThrottlePingThreshold || stats.StdDevRtt.Milliseconds() > conf.ThrottlePingThreshold/2 { lastStep *= 2 newUp := up - lastStep if !m.isBWInRange(newUp) { newUp = conf.Bandwidth.Min } log.Infof("extended ping %s±%s, adjusting upload TC %f->%f", stats.AvgRtt, stats.StdDevRtt, up, newUp) up, err = m.SetUpload(newUp) if err != nil { return err } m.LastOver = time.Now() if newUp == conf.Bandwidth.Min { log.Infof("reached lower limit, stopping extended pings") break } } time.Sleep(5 * time.Second) } } else if time.Since(m.LastOver) >= time.Duration(conf.TryRestoringAfter)*time.Minute { up, err := m.Upload() if err != nil { return err } if m.isBWInRange(up + conf.Bandwidth.Step) { log.Infof("short ping %s±%s, trying to increase upload TC %f->%f", stats.AvgRtt, stats.StdDevRtt, up, up+conf.Bandwidth.Step) up, err = m.SetUpload(up + conf.Bandwidth.Step) if err != nil { return err } m.LastOver = time.Now() log.Infof("upload TC increased to %f, measuring latency", up) stats, err = doPing(conf.Host, conf.ConformationPPP, time.Second) if err != nil { log.Warningf("ping to %s failed: %v", conf.Host, err) if !errors.Is(err, ErrorNoResponse) { time.Sleep(time.Duration(conf.Interval) * time.Minute) } continue } if stats.AvgRtt.Milliseconds() >= conf.ThrottlePingThreshold || stats.StdDevRtt.Milliseconds() > conf.ThrottlePingThreshold/2 { log.Infof("increase failed with %s ping, reverting to %f", stats.AvgRtt, up-conf.Bandwidth.Step) _, err = m.SetUpload(up - conf.Bandwidth.Step) if err != nil { return err } } else { log.Infof("extended ping %s±%s seems stable", stats.AvgRtt, stats.StdDevRtt) } } } time.Sleep(time.Duration(conf.Interval) * time.Second) } } func main() { flag.Parse() killSignals := make(chan os.Signal, 1) signal.Notify(killSignals, syscall.SIGINT, syscall.SIGTERM) confStr, err := os.ReadFile(*configFlag) if err != nil { log.Fatalf("Error reading config file: %v", err) } err = yaml.Unmarshal(confStr, &conf) if err != nil { log.Fatalf("Error parsing config file: %v", err) } lvl, err := log.ParseLevel(conf.LogLevel) if err != nil { log.Fatalf("Error parsing log level from config: %v", err) } log.SetLevel(lvl) if *journalLog { journalhook.Enable() } um := new(UploadManager) go func() { err := um.pingWorker() if err != nil { log.Fatalf("Error in ping-loop: %v", err) } }() for range killSignals { break } }