added test_channel function

added json encoder for controller
This commit is contained in:
Giovanni Harting
2015-07-18 02:10:36 +02:00
parent 288f673cee
commit 4c23abc4e8
2 changed files with 52 additions and 5 deletions

View File

@@ -1,6 +1,9 @@
from json import JSONEncoder
import smbus
from colour import Color
PCA9685_SUBADR1 = 0x2
PCA9685_SUBADR2 = 0x3
PCA9685_SUBADR3 = 0x4
@@ -44,12 +47,12 @@ class Controller:
def save_to_db(self):
cur = self.db.cursor()
if self.id == -1:
cur.execute("INSERT INTO controller (pwm_freq, channels, i2c_device, address) VALUES ()",
cur.execute("INSERT INTO controller (pwm_freq, channels, i2c_device, address) VALUES (?,?,?,?)",
(self.pwm_freq, self.channels, self.i2c_device, self.address))
self.id = cur.lastrowid
else:
cur.execute("UPDATE controller SET pwm_freq=?, channels=?, i2c_device=?, address=? WHERE id = ?",
(self.pwm_freq, self.channels, self.i2c_device, self.address, self.id))
(self.pwm_freq, self.channels, self.i2c_device, self.address, self.id))
cur.close()
self.db.commit()
@@ -76,8 +79,9 @@ class Controller:
return "<Controller stripes={} cid={}>".format(len(self.stripes), self.id)
def set_channel(self, channel, val):
self.bus.write_word_data(self.address, LED0_OFF_L + 4 * channel, val * 4095)
self.bus.write_word_data(self.address, LED0_ON_L + 4 * channel, 0)
print(type(LED0_OFF_L + 4 * channel), type(val), channel, val, type(self.address))
self.bus.write_word_data(int(self.address, 16), LED0_OFF_L + 4 * channel, int(val * 4095))
self.bus.write_word_data(int(self.address, 16), LED0_ON_L + 4 * channel, 0)
def get_channel(self, channel):
return self.bus.read_word_data(self.address, LED0_OFF_L + 4 * channel)
@@ -131,3 +135,16 @@ class Stripe:
return self._color
color = property(get_color, set_color)
class ControllerEncoder(JSONEncoder):
def default(self, o):
if isinstance(o, Controller):
return {
'id': o.id,
'pwm_freq': o.pwm_freq,
'channel': o.channels,
'address': o.address,
'stripes': o.stripes,
'i2c_device': o.i2c_device
}

View File

@@ -22,6 +22,8 @@ import sqlite3
import os
import sys
import traceback
import time
from . import controller
@@ -104,6 +106,7 @@ class Daemon:
if "action" in json_decoded:
if json_decoded['action'] == "set_color":
# TODO: add adapter setting stripe with color here
print("recieved action: {}".format(json_decoded['action']))
elif json_decoded['action'] == "add_controller":
@@ -112,8 +115,9 @@ class Daemon:
try:
ncontroller = controller.Controller(Daemon.instance.sqldb, json_decoded['channels'],
json_decoded['i2c_dev'], json_decoded['address'])
except OSError as e:
except OSError:
print("Error opening i2c device!")
self.send("{}\n".format(ncontroller.id).encode())
Daemon.instance.controllers.append(ncontroller)
elif json_decoded['action'] == "get_color":
@@ -124,10 +128,36 @@ class Daemon:
for stripe in json_decoded['stripes']:
# TODO: add stripe here
print(len(json_decoded['stripes']))
elif json_decoded['action'] == "get_controllers":
rjson = {
'status': 'success',
'ccount': len(Daemon.instance.controllers),
'controller': Daemon.instance.controllers
}
self.send("{}\n".format(json.dumps(rjson, cls=controller.ControllerEncoder)).encode())
elif json_decoded['action'] == "connection_check":
result = next(filter(lambda x: x.id == json_decoded['id'], Daemon.instance.controllers),
None)
""" :type : Controller """
if result is not None:
print("we can do it!")
for i in range(result.channels):
print("set channel {}={}".format(i, "1"))
result.set_channel(i, 1)
time.sleep(2)
result.set_channel(i, 0)
rjson = {
'status': 'success'
}
self.send("{}\n".format(json.dumps(rjson, cls=controller.ControllerEncoder)).encode())
else:
print("no action found, ignoring")
except TypeError as e:
print("No valid JSON found: {}".format(e))
traceback.print_exc(file=sys.stdout)
except ValueError:
print("No valid JSON detected!")
traceback.print_exc(file=sys.stdout)