Files
LedD.GO/ledd.go

603 lines
15 KiB
Go

package main
import (
"encoding/binary"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"net"
"os"
"os/signal"
"syscall"
"fmt"
"gen/ledd"
"github.com/golang/protobuf/proto"
"github.com/lucasb-eyer/go-colorful"
"github.com/op/go-logging"
"gopkg.in/yaml.v2"
"io/ioutil"
)
// CONSTANTS
const VERSION = "0.1"
const LOG_BACKEND = "BH"
const LOG_CLIENTS = "CH"
// STRUCTS
type Config struct {
Name string
Daemon struct {
Frontend struct {
Host string
Port int
}
Backend struct {
Host string
Port int
}
}
Mongodb struct {
Host string
Port int
Database string
}
}
type BackendManager struct {
backends map[string]*Backend
broadcast chan []byte
register chan *Backend
unregister chan *Backend
}
type Backend struct {
name string
platformType string
version string
channel int32
resolution int32
socket net.Conn
data chan []byte
}
type ClientManager struct {
clients map[*Client]bool
broadcast chan []byte
register chan *Client
unregister chan *Client
}
type Client struct {
platform string
socket net.Conn
data chan []byte
}
type LED struct {
Name string
Channel []int32
Backend string
color chan colorful.Color
}
type LEDManager struct {
leds map[string]*LED
broadcast chan colorful.Color
add chan *LED
remove chan *LED
}
// GLOBAL VARS
var log = logging.MustGetLogger("LedD")
var backManager = BackendManager{}
var clientManager = ClientManager{}
var ledManager = LEDManager{}
var LEDCollection = &mgo.Collection{}
var config = Config{}
// SOCKET SETUP
func setupSocket(host string, port int, logTag string, backend bool) (func(), error) {
ln, err := net.Listen("tcp4", fmt.Sprintf("%s:%d", host, port))
if err != nil {
return nil, err
}
log.Infof("[%s] Ready to handle connections.", logTag)
return func() {
for {
conn, err := ln.Accept()
if err != nil {
log.Warningf("%s", err)
}
log.Infof("[%s] New connection from %s", logTag, conn.RemoteAddr())
if backend {
backend := &Backend{socket: conn, data: make(chan []byte, 20)}
go backManager.receive(backend)
go backManager.send(backend)
} else {
client := &Client{socket: conn, data: make(chan []byte, 20)}
go clientManager.receive(client)
go clientManager.send(client)
}
}
}, nil
}
// LED MANAGER
func (manager *LEDManager) start() {
for {
select {
case led := <-manager.add:
log.Debugf("[%s] Request to add LED: %s (%s)", led.Backend, led.Name, led.Channel)
if led.Name == "" || len(led.Channel) == 0 || led.Backend == "" {
log.Warningf("[%s] Can't add LED without required information! (%s)", LOG_CLIENTS, led)
continue
}
if _, ok := manager.leds[led.Name]; ok {
log.Warningf("[%s] Can't add LED: already existent! (%s)", LOG_CLIENTS, led.Name)
continue
}
manager.leds[led.Name] = led
go manager.color(led)
err := LEDCollection.Insert(led)
if err != nil {
log.Warning("[%s] Error while adding LED to database: %s", LOG_BACKEND, err)
}
case led := <-manager.remove:
if _, ok := manager.leds[led.Name]; ok {
log.Debugf("[%s] Request to remove %s", led.Backend, led.Name)
LEDCollection.RemoveAll(bson.M{"name": led.Name})
delete(manager.leds, led.Name)
}
case color := <-manager.broadcast:
for _, led := range manager.leds {
select {
case led.color <- color:
}
}
}
}
}
func (manager *LEDManager) color(led *LED) {
for {
select {
case color := <-led.color:
if backend, ok := backManager.backends[led.Backend]; ok {
if len(led.Channel) != 3 {
log.Warningf("[%s] Currently only RGB LEDs are supported", led.Name)
return
}
cMap := make(map[int32]int32)
if !color.IsValid() {
color = color.Clamped()
}
cMap[led.Channel[0]] = int32(color.R * float64(backend.resolution))
cMap[led.Channel[1]] = int32(color.G * float64(backend.resolution))
cMap[led.Channel[2]] = int32(color.B * float64(backend.resolution))
wrapperMsg := &ledd.BackendWrapperMessage{
Msg: &ledd.BackendWrapperMessage_MSetChannel{
MSetChannel: &ledd.BackendSetChannel{
NewChannelValues: cMap}}}
data, err := proto.Marshal(wrapperMsg)
if err != nil {
log.Warningf("[%s] Failed to encode protobuf msg to %s: %s", led.Name, backend.name, err)
}
backend.data <- prepareProtobuf(data)
} else {
log.Warningf("[LM] Failed to set color for %s: backend %s not found", led.Name, led.Backend)
}
}
}
}
// BACKEND HANDLER
func (manager *BackendManager) start() {
for {
select {
case backend := <-manager.register:
manager.backends[backend.name] = backend
log.Debugf("[%s] %s registered", LOG_BACKEND, backend.niceName())
wrapperMsg := &ledd.BackendWrapperMessage{
Msg: &ledd.BackendWrapperMessage_MLedd{
MLedd: &ledd.LedD{
Name: config.Name,
},
},
}
data, err := proto.Marshal(wrapperMsg)
if err != nil {
log.Warningf("[%s] Failed to encode protobuf: %s", backend.niceName(), err)
}
backend.data <- prepareProtobuf(data)
case backend := <-manager.unregister:
if _, ok := manager.backends[backend.name]; ok {
log.Debugf("[%s] %s removed: connection terminated", LOG_BACKEND, backend.socket.RemoteAddr())
close(backend.data)
delete(manager.backends, backend.name)
}
case message := <-manager.broadcast:
for _, backend := range manager.backends {
select {
case backend.data <- message:
default:
close(backend.data)
delete(manager.backends, backend.name)
}
}
}
}
}
func (manager *BackendManager) stop() {
for _, backend := range manager.backends {
close(backend.data)
}
}
func (manager *BackendManager) send(backend *Backend) {
defer backend.socket.Close()
for {
select {
case message, ok := <-backend.data:
if !ok {
return
}
backend.socket.Write(message)
}
}
}
func (manager *BackendManager) receive(backend *Backend) {
for {
message := make([]byte, 4096)
length, err := backend.socket.Read(message)
if err != nil {
log.Warningf("[%s] Read failed: %s", backend.niceName(), err)
manager.unregister <- backend
backend.socket.Close()
break
}
if length > 0 {
msgLen := binary.BigEndian.Uint32(message[0:4])
// log.Debugf("[%s] Read %d bytes, first protobuf is %d long", Backend.niceName(), length, msgLen)
backendMsg := &ledd.BackendWrapperMessage{}
err = proto.Unmarshal(message[4:msgLen+4], backendMsg)
if err != nil {
log.Warningf("[%s] Couldn't decode protobuf msg!", backend.niceName())
continue
}
switch msg := backendMsg.Msg.(type) {
case *ledd.BackendWrapperMessage_MBackend:
nBackend := msg.MBackend
backend.name = nBackend.Name
backend.channel = nBackend.Channel
backend.resolution = nBackend.Resolution
backend.platformType = nBackend.Type
backend.version = nBackend.Version
log.Infof("[%s] %s is now identified as %s", LOG_BACKEND, backend.socket.RemoteAddr(), backend.niceName())
backManager.register <- backend
}
}
}
}
// CLIENT HANDLER
func (manager *ClientManager) start() {
for {
select {
case client := <-manager.register:
manager.clients[client] = true
log.Debugf("[%s] Client %s (%s) registered", LOG_CLIENTS, client.socket.RemoteAddr(), client.platform)
backends := make([]*ledd.Backend, 0, len(backManager.backends))
leds := make([]*ledd.LED, 0, len(ledManager.leds))
for _, led := range ledManager.leds {
leds = append(leds, &ledd.LED{
Name: led.Name,
})
}
for _, backend := range backManager.backends {
backends = append(backends, &ledd.Backend{
Name: backend.name,
Channel: backend.channel,
Resolution: backend.resolution,
Type: backend.platformType,
Version: backend.version,
})
}
wrapperMsg := &ledd.ClientWrapperMessage{
Leds: leds,
Backends: backends,
Msg: &ledd.ClientWrapperMessage_MLedd{
MLedd: &ledd.LedD{
Name: config.Name,
},
},
}
data, err := proto.Marshal(wrapperMsg)
if err != nil {
log.Warningf("[%s] Failed to encode protobuf msg: %s", client.socket.RemoteAddr(), err)
}
client.data <- prepareProtobuf(data)
case client := <-manager.unregister:
if _, ok := manager.clients[client]; ok {
log.Debugf("[%s] %s (%s) removed", LOG_CLIENTS, client.socket.RemoteAddr(), client.platform)
close(client.data)
delete(manager.clients, client)
}
case message := <-manager.broadcast:
for connection := range manager.clients {
select {
case connection.data <- message:
default:
close(connection.data)
delete(manager.clients, connection)
}
}
}
}
}
func (manager *ClientManager) send(client *Client) {
defer client.socket.Close()
for {
select {
case message, ok := <-client.data:
if !ok {
return
}
client.socket.Write(message)
}
}
}
func (manager *ClientManager) receive(client *Client) {
for {
message := make([]byte, 4096)
length, err := client.socket.Read(message)
if err != nil {
log.Warningf("[%s] Read failed: %s", client.socket.RemoteAddr(), err)
manager.unregister <- client
client.socket.Close()
break
}
if length > 0 {
//log.Debugf("[%s] Read %d bytes", client.socket.RemoteAddr(), length)
for i := 0; i < length; {
msgLen := int(binary.BigEndian.Uint32(message[i:i+4]))
// log.Debugf("[%s] Reading protobuf after %d (len=%d)", client.socket.RemoteAddr(), i+4, msgLen)
clientMsg := &ledd.ClientWrapperMessage{}
err = proto.Unmarshal(message[i+4:i+msgLen+4], clientMsg)
i += msgLen + 4
if err != nil {
log.Warningf("[%s] Couldn't decode protobuf msg!", client.socket.RemoteAddr())
continue
}
switch msg := clientMsg.Msg.(type) {
case *ledd.ClientWrapperMessage_MClient:
client.platform = msg.MClient.Type
log.Infof("[%s] %s is now identified as client (%s)", LOG_CLIENTS, client.socket.RemoteAddr(), client.platform)
clientManager.register <- client
case *ledd.ClientWrapperMessage_MGetLed:
allLED := make([]*ledd.LED, 0)
for _, led := range ledManager.leds {
allLED = append(allLED, &ledd.LED{Name: led.Name})
}
data, err := proto.Marshal(&ledd.ClientWrapperMessage{Leds: allLED})
if err != nil {
log.Errorf("[%s] Error encoding protobuf: %s", client.socket.RemoteAddr(), err)
break
}
client.data <- prepareProtobuf(data)
case *ledd.ClientWrapperMessage_MAddLed:
backend, ok := backManager.backends[msg.MAddLed.Backend]
if !ok {
log.Warningf("[%s] Can't add LED for non-existing backend %s", client.socket.RemoteAddr(), msg.MAddLed.Backend)
break
}
if _, ok := ledManager.leds[msg.MAddLed.Name]; ok {
log.Warningf("[%s] Can't add LED with exisiting name %s", client.socket.RemoteAddr(), msg.MAddLed.Name)
break
}
nLED := &LED{
Name: msg.MAddLed.Name,
Channel: msg.MAddLed.Channel,
Backend: backend.name,
color: make(chan colorful.Color, 20),
}
ledManager.add <- nLED
case *ledd.ClientWrapperMessage_MSetLed:
leds := clientMsg.Leds
if len(leds) == 0 {
log.Warningf("[%s] Got setLED with no LEDs attached!", client.socket.RemoteAddr())
break
}
for _, pLED := range leds {
led, ok := ledManager.leds[pLED.Name]
if !ok {
log.Warningf("[%s] Failed to set %s: not found", client.socket.RemoteAddr(), pLED.Name)
break
}
// log.Debugf("[%s] Set %s to %s", client.socket.RemoteAddr(), led.Name, colorful.Hcl(msg.MSetLed.Colour.Hue, msg.MSetLed.Colour.Chroma, msg.MSetLed.Colour.Light))
if pLED.Color == nil {
led.color <- colorful.Hcl(msg.MSetLed.Colour.Hue, msg.MSetLed.Colour.Chroma, msg.MSetLed.Colour.Light)
} else {
led.color <- colorful.Hcl(pLED.Color.Hue, pLED.Color.Chroma, pLED.Color.Light)
}
}
case *ledd.ClientWrapperMessage_MSetDirect:
backend, ok := backManager.backends[msg.MSetDirect.Backend]
if !ok {
log.Warningf("[%s] Can't set channel for non-existing backend %s", client.socket.RemoteAddr(), msg.MSetDirect.Backend)
break
}
backend.setChannel(msg.MSetDirect.Channel, msg.MSetDirect.Value)
case *ledd.ClientWrapperMessage_MRemoveLed:
led, ok := ledManager.leds[msg.MRemoveLed.Name]
if !ok {
log.Warningf("[%s] Failed to remove %s: not found", client.socket.RemoteAddr(), msg.MRemoveLed.Name)
break
}
ledManager.remove <- led
}
}
}
}
}
// HELPER
func check(e error) {
if e != nil {
panic(e)
}
}
func (backend *Backend) niceName() string {
if backend.name != "" {
return backend.name
} else {
return backend.socket.RemoteAddr().String()
}
}
func (backend Backend) setChannel(channel int32, val int32) {
cMap := make(map[int32]int32)
cMap[channel] = val
wrapperMsg := &ledd.BackendWrapperMessage{
Msg: &ledd.BackendWrapperMessage_MSetChannel{
MSetChannel: &ledd.BackendSetChannel{
NewChannelValues: cMap}}}
data, err := proto.Marshal(wrapperMsg)
if err != nil {
log.Warningf("[%s] Failed to encode protobuf msg: %s", backend.niceName(), err)
}
backend.data <- prepareProtobuf(data)
}
func prepareProtobuf(data []byte) []byte {
size := make([]byte, 4)
binary.BigEndian.PutUint32(size, uint32(len(data)))
return append(size, data...)
}
// MAIN
func main() {
killSignals := make(chan os.Signal, 1)
signal.Notify(killSignals, syscall.SIGINT, syscall.SIGTERM)
log.Info("LedD", VERSION)
content, err := ioutil.ReadFile("ledd.yaml")
check(err)
err = yaml.Unmarshal(content, &config)
check(err)
session, err := mgo.Dial(fmt.Sprintf("%s:%d", config.Mongodb.Host, config.Mongodb.Port))
check(err)
defer session.Close()
LEDCollection = session.DB(config.Mongodb.Database).C("led")
backManager = BackendManager{
backends: make(map[string]*Backend),
broadcast: make(chan []byte, 20),
register: make(chan *Backend, 10),
unregister: make(chan *Backend, 10),
}
go backManager.start()
clientManager = ClientManager{
clients: make(map[*Client]bool),
broadcast: make(chan []byte, 20),
register: make(chan *Client, 10),
unregister: make(chan *Client, 10),
}
go clientManager.start()
ledManager = LEDManager{
leds: make(map[string]*LED),
broadcast: make(chan colorful.Color, 10),
add: make(chan *LED, 10),
remove: make(chan *LED, 10),
}
go ledManager.start()
var dbLEDs = make([]LED, 0)
err = LEDCollection.Find(nil).All(&dbLEDs)
if err != nil {
log.Notice("Failed to load LEDs from db. If there should be LEDs, check db connection")
}
for _, l := range dbLEDs {
ledManager.add <- &l
}
backendThread, err := setupSocket(config.Daemon.Backend.Host, config.Daemon.Backend.Port, LOG_BACKEND, true)
check(err)
go backendThread()
frontendThread, err := setupSocket(config.Daemon.Frontend.Host, config.Daemon.Frontend.Port, LOG_CLIENTS, false)
check(err)
go frontendThread()
log.Infof("All connection handler ready.")
<-killSignals
backManager.stop()
}