switched to asyncio

This commit is contained in:
Giovanni Harting
2015-08-28 00:43:01 +02:00
parent d64aef6447
commit ebf7871bdc
2 changed files with 71 additions and 82 deletions

View File

@@ -14,8 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import asyncore
import socket
import logging
import configparser
import json
import sqlite3
@@ -23,34 +22,33 @@ import os
import sys
import traceback
import time
import logging
from multiprocessing import Process
import nose
import asyncio
from ledd import controller, VERSION
from ledd.decorators import add_action
log = logging.getLogger(__name__)
class Daemon:
daemonSection = 'daemon'
databaseSection = 'db'
instance = None
""":type : Daemon """
loop = None
""" :type : asyncio.BaseEventLoop """
action_dict = {}
def __init__(self):
Daemon.instance = self
logging.basicConfig(level=logging.DEBUG,
format="[%(asctime)s] %(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s",
datefmt="%H:%M:%S")
try:
self.config = configparser.ConfigParser()
try:
with open('ledd.config', 'w+') as f:
self.config.read_file(f)
except FileNotFoundError:
logging.info("No config file found!")
log.info("No config file found!")
self.sqldb = sqlite3.connect(self.config.get(self.databaseSection, 'name', fallback='ledd.sqlite'))
self.sqldb.row_factory = sqlite3.Row
@@ -61,14 +59,21 @@ class Daemon:
self.sqldb.commit()
self.controllers = controller.Controller.from_db(self.sqldb)
logging.debug(self.controllers)
log.debug(self.controllers)
server = self.SocketServer(self.config.get(self.daemonSection, 'host', fallback='0.0.0.0'),
self.config.get(self.daemonSection, 'port', fallback=1425))
asyncore.loop()
logging.getLogger("asyncio").setLevel(logging.DEBUG)
self.loop = asyncio.get_event_loop()
self.loop.set_debug(True)
coro = self.loop.create_server(LedDProtocol,
self.config.get(self.daemonSection, 'host', fallback='0.0.0.0'),
self.config.get(self.daemonSection, 'port', fallback=1425))
self.loop.run_until_complete(coro)
self.loop.run_forever()
except (KeyboardInterrupt, SystemExit):
logging.info("Exiting")
log.info("Exiting")
self.sqldb.close()
self.loop.close()
sys.exit(0)
def check_db(self):
@@ -84,7 +89,7 @@ class Daemon:
c.close()
if db_version is not None:
logging.info("DB connection established; db-version=%s", db_version[0])
log.info("DB connection established; db-version=%s", db_version[0])
return True
else:
return False
@@ -112,7 +117,7 @@ class Daemon:
:param req_json: dict of request json
"""
# TODO: add adapter setting stripe with color here
logging.debug("recieved action: %s", req_json['action'])
log.debug("recieved action: %s", req_json['action'])
@add_action(action_dict)
def add_controller(self, req_json):
@@ -122,12 +127,12 @@ class Daemon:
address: hexdecimal address of controller on i2c bus, e.g. 0x40
:param req_json: dict of request json
"""
logging.debug("recieved action: %s", req_json['action'])
log.debug("recieved action: %s", req_json['action'])
try:
ncontroller = controller.Controller(Daemon.instance.sqldb, req_json['channels'],
req_json['i2c_dev'], req_json['address'])
except OSError as e:
logging.error("Error opening i2c device: %s", req_json['i2c_dev'])
log.error("Error opening i2c device: %s", req_json['i2c_dev'])
rjson = {
'success': False,
'message': "Error while opening i2c device",
@@ -153,7 +158,7 @@ class Daemon:
Required JSON parameters: stripeid: sid
:param req_json: dict of request json
"""
logging.debug("recieved action: %s", req_json['action'])
log.debug("recieved action: %s", req_json['action'])
# TODO: Add get color logic
@add_action(action_dict)
@@ -163,11 +168,11 @@ class Daemon:
Required JSON parameters:
:param req_json: dict of request json
"""
logging.debug("recieved action: %s", req_json['action'])
log.debug("recieved action: %s", req_json['action'])
if "stripes" in req_json:
for stripe in req_json['stripes']:
# TODO: add stripe here
logging.debug(len(req_json['stripes']))
log.debug(len(req_json['stripes']))
@add_action(action_dict)
def get_controllers(self, req_json):
@@ -176,7 +181,7 @@ class Daemon:
Required JSON parameters: none
:param req_json: dict of request json
"""
logging.debug("recieved action: %s", req_json['action'])
log.debug("recieved action: %s", req_json['action'])
rjson = {
'success': True,
@@ -194,14 +199,14 @@ class Daemon:
Required JSON parameters: controller id: cid
:param req_json: dict of request json
"""
logging.debug("recieved action: %s", req_json['action'])
log.debug("recieved action: %s", req_json['action'])
result = next(filter(lambda x: x.id == req_json['cid'], self.controllers), None)
""" :type : Controller """
if result is not None:
for i in range(result.channels):
logging.debug("set channel %d=%s", i, "1")
log.debug("set channel %d=%s", i, "1")
result.set_channel(i, 1)
time.sleep(10)
result.set_channel(i, 0)
@@ -220,7 +225,7 @@ class Daemon:
Required JSON parameters: none
:param req_json: dict of request json
"""
logging.debug("recieved action: %s", req_json['action'])
log.debug("recieved action: %s", req_json['action'])
rjson = {
'success': True,
@@ -230,56 +235,46 @@ class Daemon:
return json.dumps(rjson)
class ConnectionHandler(asyncore.dispatcher_with_send):
def handle_read(self):
data = self.recv(5120)
self.debug = True
def no_action_found(self, req_json):
rjson = {
'success': False,
'message': "No action found",
'ref': req_json['ref']
}
return json.dumps(rjson)
class LedDProtocol(asyncio.Protocol):
transport = None
if data:
try:
json_decoded = json.loads(data.decode())
logging.debug(json.dumps(json_decoded, sort_keys=True))
def connection_made(self, transport):
log.info("New connection from %s", transport.get_extra_info("peername"))
self.transport = transport
if "action" in json_decoded and "ref" in json_decoded:
return_data = Daemon.instance.action_dict.get(json_decoded['action'], no_action_found)(
self=Daemon.instance,
req_json=json_decoded)
def data_received(self, data):
log.info("Received: %s", data.decode())
self.select_task(data)
if return_data is not None:
self.send("{}\n".format(return_data).encode())
else:
logging.warning("no action or ref value found in JSON, ignoring")
except TypeError:
logging.error("No valid JSON found: %s", traceback.format_exc())
except ValueError:
logging.error("No valid JSON detected: %s", traceback.format_exc())
def select_task(self, data):
if data:
try:
json_decoded = json.loads(data.decode())
class SocketServer(asyncore.dispatcher):
def __init__(self, host, port):
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind((host, port))
self.listen(5)
if "action" in json_decoded and "ref" in json_decoded:
return_data = Daemon.instance.action_dict.get(json_decoded['action'], self.no_action_found)(
json_decoded)
p = Process(target=self.run_tests)
p.start()
if return_data is not None:
self.transport.write("{}\n".format(return_data).encode())
else:
log.debug("no action or ref value found in JSON, ignoring")
except TypeError:
log.debug("No valid JSON found: %s", traceback.format_exc())
except ValueError:
log.debug("No valid JSON detected: %s", traceback.format_exc())
@staticmethod
def run_tests():
nose.run()
@staticmethod
def no_action_found(req_json):
rjson = {
'success': False,
'message': "No action found",
'ref': req_json['ref']
}
return json.dumps(rjson)
def handle_accept(self):
pair = self.accept()
if pair is not None:
sock, addr = pair
logging.debug('Incoming connection from %s' % repr(addr))
handler = Daemon.ConnectionHandler(sock)
def connection_lost(self, exc):
# The socket has been closed, stop the event loop
# Daemon.loop.stop()
log.info("Lost connection to %s", self.transport.get_extra_info("peername"))

View File

@@ -34,21 +34,15 @@ if "smbus" not in (name for loader, name, ispkg in iter_modules()):
def read_word_data(self, cmd):
return self.channels[(cmd - 8) / 4]
import sys
sys.modules['smbus'] = SMBus
import ledd.daemon
if __name__ == "__main__":
log = logging.getLogger("")
formatter = logging.Formatter("%(asctime)s %(levelname)s " +
"[%(module)s:%(lineno)d] %(message)s")
# setup console logging
log.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)
log.addHandler(ch)
logging.basicConfig(level=logging.DEBUG,
format="[%(asctime)s] %(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s",
datefmt="%H:%M:%S")
log = logging.getLogger(__name__)
daemon = ledd.daemon.Daemon()