package main import ( "encoding/binary" "encoding/json" "github.com/nanobox-io/golang-scribble" "math" "net" "os" "os/signal" "syscall" "fmt" "gen/ledd" "github.com/golang/protobuf/proto" "github.com/lucasb-eyer/go-colorful" "github.com/op/go-logging" "gopkg.in/yaml.v2" "io/ioutil" ) // CONSTANTS const VERSION = "0.1" const LOG_BACKEND = "BH" const LOG_CLIENTS = "CH" // STRUCTS type Config struct { Name string Daemon struct { Frontend struct { Host string Port int } Backend struct { Host string Port int } } Mongodb struct { Host string Port int Database string } } type BackendManager struct { backends map[string]*Backend broadcast chan []byte register chan *Backend unregister chan *Backend } type Backend struct { name string platformType string version string channel int32 resolution int32 socket net.Conn data chan []byte } type ClientManager struct { clients map[*Client]bool broadcast chan []byte register chan *Client unregister chan *Client } type Client struct { platform string socket net.Conn data chan []byte } type LED struct { Name string Channel []int32 Backend string color chan colorful.Color } type LEDManager struct { leds map[string]*LED broadcast chan colorful.Color add chan *LED remove chan *LED } // GLOBAL VARS var log = logging.MustGetLogger("LedD") var backManager BackendManager var clientManager ClientManager var ledManager LEDManager var db *scribble.Driver var config Config // SOCKET SETUP func setupSocket(host string, port int, logTag string, backend bool) (func(), error) { ln, err := net.Listen("tcp4", fmt.Sprintf("%s:%d", host, port)) if err != nil { return nil, err } log.Infof("[%s] Ready to handle connections.", logTag) return func() { for { conn, err := ln.Accept() if err != nil { log.Warningf("%s", err) } log.Infof("[%s] New connection from %s", logTag, conn.RemoteAddr()) if backend { backend := &Backend{socket: conn, data: make(chan []byte, 20)} go backManager.receive(backend) go backManager.send(backend) } else { client := &Client{socket: conn, data: make(chan []byte, 20)} go clientManager.receive(client) go clientManager.send(client) } } }, nil } // LED MANAGER func (manager *LEDManager) start() { for { select { case led := <-manager.add: log.Debugf("[%s] Request to add LED: %s (%s)", led.Backend, led.Name, led.Channel) if led.Name == "" || len(led.Channel) == 0 || led.Backend == "" { log.Warningf("[%s] Can't add LED without required information! (%s)", LOG_CLIENTS, led) continue } if _, ok := manager.leds[led.Name]; ok { log.Warningf("[%s] Can't add LED: already in LEDM! (%s)", LOG_CLIENTS, led.Name) continue } var dbLED LED err := db.Read("led", led.Name, &dbLED) if err == nil { // log.Warningf("[%s] LED already in DB! (%s)", LOG_CLIENTS, led.Name) } else if os.IsNotExist(err) { err = db.Write("led", led.Name, led) if err != nil { log.Warning("[%s] Error while adding LED to database: %s", LOG_BACKEND, err) } } else { log.Warning("[%s] Error while checking database for LED: %s", LOG_BACKEND, err) } manager.leds[led.Name] = led go manager.color(led) case led := <-manager.remove: if _, ok := manager.leds[led.Name]; ok { log.Debugf("[%s] Request to remove %s", led.Backend, led.Name) err := db.Delete("led", led.Name) check(err) delete(manager.leds, led.Name) } case color := <-manager.broadcast: for _, led := range manager.leds { select { case led.color <- color: } } } } } func (manager *LEDManager) color(led *LED) { for { select { case color := <-led.color: if backend, ok := backManager.backends[led.Backend]; ok { if len(led.Channel) != 3 { log.Warningf("[%s] Currently only RGB LEDs are supported", led.Name) return } cMap := make(map[int32]int32) if !color.IsValid() { log.Warningf("[%s] Got invalid HCL->RGB color, clamping!", led.Name) color = color.Clamped() } log.Debugf("[%s] New color: \x1b[38;2;%d;%d;%dm%s\x1b[0m", led.Name, int(math.Round(color.R*255)), int(math.Round(color.G*255)), int(math.Round(color.B*255)), color.Hex()) cMap[led.Channel[0]] = int32(color.R * float64(backend.resolution)) cMap[led.Channel[1]] = int32(color.G * float64(backend.resolution)) cMap[led.Channel[2]] = int32(color.B * float64(backend.resolution)) wrapperMsg := &ledd.BackendWrapperMessage{ Msg: &ledd.BackendWrapperMessage_MSetChannel{ MSetChannel: &ledd.BackendSetChannel{ NewChannelValues: cMap}}} data, err := proto.Marshal(wrapperMsg) if err != nil { log.Warningf("[%s] Failed to encode protobuf msg to %s: %s", led.Name, backend.name, err) } backend.data <- prepareProtobuf(data) } else { log.Warningf("[LM] Failed to set color for %s: backend %s not found", led.Name, led.Backend) } } } } // BACKEND HANDLER func (manager *BackendManager) start() { for { select { case backend := <-manager.register: manager.backends[backend.name] = backend log.Debugf("[%s] %s registered", LOG_BACKEND, backend.niceName()) wrapperMsg := &ledd.BackendWrapperMessage{ Msg: &ledd.BackendWrapperMessage_MLedd{ MLedd: &ledd.LedD{ Name: config.Name, }, }, } data, err := proto.Marshal(wrapperMsg) if err != nil { log.Warningf("[%s] Failed to encode protobuf: %s", backend.niceName(), err) } backend.data <- prepareProtobuf(data) case backend := <-manager.unregister: if _, ok := manager.backends[backend.name]; ok { log.Debugf("[%s] %s removed: connection terminated", LOG_BACKEND, backend.socket.RemoteAddr()) close(backend.data) delete(manager.backends, backend.name) } case message := <-manager.broadcast: for _, backend := range manager.backends { select { case backend.data <- message: default: close(backend.data) delete(manager.backends, backend.name) } } } } } func (manager *BackendManager) stop() { for _, backend := range manager.backends { close(backend.data) } } func (manager *BackendManager) send(backend *Backend) { defer backend.socket.Close() for { select { case message, ok := <-backend.data: if !ok { return } backend.socket.Write(message) } } } func (manager *BackendManager) receive(backend *Backend) { for { message := make([]byte, 4096) length, err := backend.socket.Read(message) if err != nil { log.Warningf("[%s] Read failed: %s", backend.niceName(), err) manager.unregister <- backend backend.socket.Close() break } if length > 0 { msgLen := binary.BigEndian.Uint32(message[0:4]) // log.Debugf("[%s] Read %d bytes, first protobuf is %d long", Backend.niceName(), length, msgLen) backendMsg := &ledd.BackendWrapperMessage{} err = proto.Unmarshal(message[4:msgLen+4], backendMsg) if err != nil { log.Warningf("[%s] Couldn't decode protobuf msg!", backend.niceName()) continue } switch msg := backendMsg.Msg.(type) { case *ledd.BackendWrapperMessage_MBackend: nBackend := msg.MBackend backend.name = nBackend.Name backend.channel = nBackend.Channel backend.resolution = nBackend.Resolution backend.platformType = nBackend.Type backend.version = nBackend.Version log.Infof("[%s] %s is now identified as %s", LOG_BACKEND, backend.socket.RemoteAddr(), backend.niceName()) backManager.register <- backend } } } } // CLIENT HANDLER func (manager *ClientManager) start() { for { select { case client := <-manager.register: manager.clients[client] = true log.Debugf("[%s] Client %s (%s) registered", LOG_CLIENTS, client.socket.RemoteAddr(), client.platform) backends := make([]*ledd.Backend, 0, len(backManager.backends)) leds := make([]*ledd.LED, 0, len(ledManager.leds)) for _, led := range ledManager.leds { leds = append(leds, &ledd.LED{ Name: led.Name, }) } for _, backend := range backManager.backends { backends = append(backends, &ledd.Backend{ Name: backend.name, Channel: backend.channel, Resolution: backend.resolution, Type: backend.platformType, Version: backend.version, }) } wrapperMsg := &ledd.ClientWrapperMessage{ Leds: leds, Backends: backends, Msg: &ledd.ClientWrapperMessage_MLedd{ MLedd: &ledd.LedD{ Name: config.Name, }, }, } data, err := proto.Marshal(wrapperMsg) if err != nil { log.Warningf("[%s] Failed to encode protobuf msg: %s", client.socket.RemoteAddr(), err) } client.data <- prepareProtobuf(data) case client := <-manager.unregister: if _, ok := manager.clients[client]; ok { log.Debugf("[%s] %s (%s) removed", LOG_CLIENTS, client.socket.RemoteAddr(), client.platform) close(client.data) delete(manager.clients, client) } case message := <-manager.broadcast: for connection := range manager.clients { select { case connection.data <- message: default: close(connection.data) delete(manager.clients, connection) } } } } } func (manager *ClientManager) send(client *Client) { defer client.socket.Close() for { select { case message, ok := <-client.data: if !ok { return } client.socket.Write(message) } } } func (manager *ClientManager) receive(client *Client) { for { message := make([]byte, 4096) length, err := client.socket.Read(message) if err != nil { log.Warningf("[%s] Read failed: %s", client.socket.RemoteAddr(), err) manager.unregister <- client client.socket.Close() break } if length > 0 { for i := 0; i < length; { msgLen := int(binary.BigEndian.Uint32(message[i : i+4])) // log.Debugf("[%s] Reading protobuf after %d (len=%d)", client.socket.RemoteAddr(), i+4, msgLen) clientMsg := &ledd.ClientWrapperMessage{} err = proto.Unmarshal(message[i+4:i+msgLen+4], clientMsg) i += msgLen + 4 if err != nil { log.Warningf("[%s] Couldn't decode protobuf msg!", client.socket.RemoteAddr()) continue } switch msg := clientMsg.Msg.(type) { case *ledd.ClientWrapperMessage_MClient: client.platform = msg.MClient.Type log.Infof("[%s] %s is now identified as client (%s)", LOG_CLIENTS, client.socket.RemoteAddr(), client.platform) clientManager.register <- client case *ledd.ClientWrapperMessage_MGetLed: allLED := make([]*ledd.LED, 0) for _, led := range ledManager.leds { allLED = append(allLED, &ledd.LED{Name: led.Name}) } data, err := proto.Marshal(&ledd.ClientWrapperMessage{Leds: allLED}) if err != nil { log.Errorf("[%s] Error encoding protobuf: %s", client.socket.RemoteAddr(), err) break } client.data <- prepareProtobuf(data) case *ledd.ClientWrapperMessage_MAddLed: backend, ok := backManager.backends[msg.MAddLed.Backend] if !ok { log.Warningf("[%s] Can't add LED for non-existing backend %s", client.socket.RemoteAddr(), msg.MAddLed.Backend) break } if _, ok := ledManager.leds[msg.MAddLed.Name]; ok { log.Warningf("[%s] Can't add LED with exisiting name %s", client.socket.RemoteAddr(), msg.MAddLed.Name) break } nLED := LED{ Name: msg.MAddLed.Name, Channel: msg.MAddLed.Channel, Backend: backend.name, color: make(chan colorful.Color, 20), } ledManager.add <- &nLED case *ledd.ClientWrapperMessage_MSetLed: leds := clientMsg.Leds if len(leds) == 0 { log.Warningf("[%s] Got setLED with no LEDs attached!", client.socket.RemoteAddr()) break } for _, pLED := range leds { led, ok := ledManager.leds[pLED.Name] if !ok { log.Warningf("[%s] Failed to set %s: not found", client.socket.RemoteAddr(), pLED.Name) break } // log.Debugf("[%s] Set %s to %s", client.socket.RemoteAddr(), led.Name, colorful.Hcl(msg.MSetLed.Colour.Hue, msg.MSetLed.Colour.Chroma, msg.MSetLed.Colour.Light)) if pLED.Color == nil { led.color <- colorful.Hcl(msg.MSetLed.Colour.Hue, msg.MSetLed.Colour.Chroma, msg.MSetLed.Colour.Light) } else { led.color <- colorful.Hcl(pLED.Color.Hue, pLED.Color.Chroma, pLED.Color.Light) } } case *ledd.ClientWrapperMessage_MSetDirect: backend, ok := backManager.backends[msg.MSetDirect.Backend] if !ok { log.Warningf("[%s] Can't set channel for non-existing backend %s", client.socket.RemoteAddr(), msg.MSetDirect.Backend) break } backend.setChannel(msg.MSetDirect.Channel, msg.MSetDirect.Value) case *ledd.ClientWrapperMessage_MRemoveLed: led, ok := ledManager.leds[msg.MRemoveLed.Name] if !ok { log.Warningf("[%s] Failed to remove %s: not found", client.socket.RemoteAddr(), msg.MRemoveLed.Name) break } ledManager.remove <- led } } } } } // HELPER func check(e error) { if e != nil { panic(e) } } func (backend *Backend) niceName() string { if backend.name != "" { return backend.name } else { return backend.socket.RemoteAddr().String() } } func (backend Backend) setChannel(channel int32, val int32) { cMap := make(map[int32]int32) cMap[channel] = val wrapperMsg := &ledd.BackendWrapperMessage{ Msg: &ledd.BackendWrapperMessage_MSetChannel{ MSetChannel: &ledd.BackendSetChannel{ NewChannelValues: cMap}}} data, err := proto.Marshal(wrapperMsg) if err != nil { log.Warningf("[%s] Failed to encode protobuf msg: %s", backend.niceName(), err) } backend.data <- prepareProtobuf(data) } func prepareProtobuf(data []byte) []byte { size := make([]byte, 4) binary.BigEndian.PutUint32(size, uint32(len(data))) return append(size, data...) } // MAIN func main() { killSignals := make(chan os.Signal, 1) signal.Notify(killSignals, syscall.SIGINT, syscall.SIGTERM) log.Info("LedD", VERSION) content, err := ioutil.ReadFile("ledd.yaml") check(err) err = yaml.Unmarshal(content, &config) check(err) db, err = scribble.New("db", nil) check(err) backManager = BackendManager{ backends: make(map[string]*Backend), broadcast: make(chan []byte, 20), register: make(chan *Backend, 10), unregister: make(chan *Backend, 10), } go backManager.start() clientManager = ClientManager{ clients: make(map[*Client]bool), broadcast: make(chan []byte, 20), register: make(chan *Client, 10), unregister: make(chan *Client, 10), } go clientManager.start() ledManager = LEDManager{ leds: make(map[string]*LED), broadcast: make(chan colorful.Color, 10), add: make(chan *LED, 10), remove: make(chan *LED, 10), } go ledManager.start() leds, err := db.ReadAll("led") if os.IsNotExist(err) { log.Infof("No DB found.") } else { check(err) } for _, ledJson := range leds { led := LED{} err = json.Unmarshal([]byte(ledJson), &led) check(err) led.color = make(chan colorful.Color, 20) ledManager.add <- &led } backendThread, err := setupSocket(config.Daemon.Backend.Host, config.Daemon.Backend.Port, LOG_BACKEND, true) check(err) go backendThread() frontendThread, err := setupSocket(config.Daemon.Frontend.Host, config.Daemon.Frontend.Port, LOG_CLIENTS, false) check(err) go frontendThread() log.Infof("All connection handler ready.") <-killSignals backManager.stop() }