package main import ( "flag" "fmt" "github.com/go-ping/ping" log "github.com/sirupsen/logrus" "github.com/wercker/journalhook" "gopkg.in/yaml.v2" "os" "os/exec" "os/signal" "regexp" "strconv" "strings" "syscall" "time" ) type Conf struct { Bandwidth struct { Max int `yaml:"max"` Min int `yaml:"min"` Step int `yaml:"step"` } `yaml:"bandwidth"` Interval float64 `yaml:"interval"` Host string `yaml:"host"` PPP int `yaml:"ppp"` ConformationPPP int `yaml:"conformation_ppp"` LogLevel string `yaml:"log_level"` UploadInterface string `yaml:"upload_interface"` CakeOptions string `yaml:"cake_options"` ThrottlePingThreshold int64 `yaml:"throttle_ping_threshold"` TryRestoringAfter int `yaml:"try_restoring_after"` } var ( conf *Conf journalLog = flag.Bool("journal", false, "Log to systemd journal instead of stdout") reBandwidth = regexp.MustCompile(`(?m)bandwidth\s(\d+)Mbit`) ) type UploadManager struct { LastOver time.Time } func (m UploadManager) Upload() (int, error) { tcCmd := exec.Command("tc", "qdisc", "show", "dev", conf.UploadInterface) out, err := tcCmd.CombinedOutput() if err != nil { return 0, err } if reBandwidth.MatchString(string(out)) { up, err := strconv.Atoi(reBandwidth.FindAllStringSubmatch(string(out), -1)[0][1]) if err != nil { return 0, err } return up, nil } else { return 0, fmt.Errorf("could not parse bandwidth from tc") } } func (m UploadManager) SetUpload(upload int) error { m.clearTC() args := []string{"qdisc", "add", "dev", conf.UploadInterface, "root", "cake", "bandwidth", fmt.Sprintf("%dMbit", upload)} args = append(args, strings.Split(conf.CakeOptions, " ")...) tcCmd := exec.Command("tc", args...) log.Debugf("[TC] exec %s", tcCmd.String()) out, err := tcCmd.CombinedOutput() if err != nil { log.Debugf("[TC] executing %s failed with: %v (%s)", tcCmd.String(), err, out) return err } return nil } func (m UploadManager) clearTC() { tcCmd := exec.Command("tc", "qdisc", "del", "dev", conf.UploadInterface, "root") log.Debugf("[TC] exec %s", tcCmd.String()) out, err := tcCmd.CombinedOutput() if err != nil { log.Debugf("[TC] executing %s failed with: %v (%s)", tcCmd.String(), err, out) } } func doPing(host string, count int, interval time.Duration) (*ping.Statistics, error) { pinger, err := ping.NewPinger(host) if err != nil { return nil, err } pinger.SetPrivileged(true) pinger.Count = count pinger.Interval = interval err = pinger.Run() if err != nil { return nil, err } return pinger.Statistics(), nil } func (m UploadManager) isBWInRange(bw int) bool { return bw <= conf.Bandwidth.Max && bw >= conf.Bandwidth.Min } func (m UploadManager) pingWorker() error { for { stats, err := doPing(conf.Host, conf.PPP, time.Second) if err != nil { log.Warningf("ping to %s failed: %v", conf.Host, err) time.Sleep(time.Duration(conf.Interval) * time.Minute) continue } log.Debugf("[PING] %s±%s", stats.AvgRtt, stats.StdDevRtt) if stats.AvgRtt.Milliseconds() >= conf.ThrottlePingThreshold { m.LastOver = time.Now() up, err := m.Upload() if err != nil { return err } if m.isBWInRange(up - conf.Bandwidth.Step) { log.Infof("Ping over threshold (%s) detected, confirming...", stats.AvgRtt) stats, err = doPing(conf.Host, conf.ConformationPPP, time.Second) if err != nil { log.Warningf("ping to %s failed: %v", conf.Host, err) time.Sleep(time.Duration(conf.Interval) * time.Minute) continue } if stats.AvgRtt.Milliseconds() >= conf.ThrottlePingThreshold { log.Infof("Ping confirmed (%s), adjusting upload TC %d->%d", stats.AvgRtt, up, up-conf.Bandwidth.Step) err = m.SetUpload(up - conf.Bandwidth.Step) if err != nil { return err } m.LastOver = time.Now() } } else { log.Infof("Ping over threshold (%s) detected, but no bandwidth available", stats.AvgRtt) m.LastOver = time.Now() } } else if time.Since(m.LastOver) >= time.Duration(conf.TryRestoringAfter)*time.Minute { up, err := m.Upload() if err != nil { return err } if m.isBWInRange(up + conf.Bandwidth.Step) { log.Infof("trying to increase upload TC %d->%d", up, up+conf.Bandwidth.Step) err = m.SetUpload(up + conf.Bandwidth.Step) if err != nil { return err } m.LastOver = time.Now() } } time.Sleep(time.Duration(conf.Interval) * time.Minute) } } func main() { killSignals := make(chan os.Signal, 1) signal.Notify(killSignals, syscall.SIGINT, syscall.SIGTERM) reloadSignals := make(chan os.Signal, 1) signal.Notify(reloadSignals, syscall.SIGUSR1) flag.Parse() confStr, err := os.ReadFile("config.yaml") if err != nil { log.Fatalf("Error reading config file: %v", err) } err = yaml.Unmarshal(confStr, &conf) if err != nil { log.Fatalf("Error parsing config file: %v", err) } lvl, err := log.ParseLevel(conf.LogLevel) if err != nil { log.Fatalf("Error parsing log level from config: %v", err) } log.SetLevel(lvl) if *journalLog { journalhook.Enable() } um := new(UploadManager) go func() { err := um.pingWorker() if err != nil { log.Fatalf("Error in ping-loop: %v", err) } }() killLoop: for { select { case <-killSignals: break killLoop } } }