From ebf7871bdcc52d396d78596ce218b9a366acde70 Mon Sep 17 00:00:00 2001 From: Giovanni Harting Date: Fri, 28 Aug 2015 00:43:01 +0200 Subject: [PATCH] switched to asyncio --- ledd/daemon.py | 137 ++++++++++++++++++++++++------------------------- start.py | 16 ++---- 2 files changed, 71 insertions(+), 82 deletions(-) diff --git a/ledd/daemon.py b/ledd/daemon.py index bd9a485..bbdb75c 100644 --- a/ledd/daemon.py +++ b/ledd/daemon.py @@ -14,8 +14,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import asyncore -import socket +import logging import configparser import json import sqlite3 @@ -23,34 +22,33 @@ import os import sys import traceback import time -import logging -from multiprocessing import Process - -import nose +import asyncio from ledd import controller, VERSION from ledd.decorators import add_action +log = logging.getLogger(__name__) + class Daemon: daemonSection = 'daemon' databaseSection = 'db' instance = None """:type : Daemon """ + loop = None + """ :type : asyncio.BaseEventLoop """ action_dict = {} def __init__(self): Daemon.instance = self - logging.basicConfig(level=logging.DEBUG, - format="[%(asctime)s] %(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s", - datefmt="%H:%M:%S") + try: self.config = configparser.ConfigParser() try: with open('ledd.config', 'w+') as f: self.config.read_file(f) except FileNotFoundError: - logging.info("No config file found!") + log.info("No config file found!") self.sqldb = sqlite3.connect(self.config.get(self.databaseSection, 'name', fallback='ledd.sqlite')) self.sqldb.row_factory = sqlite3.Row @@ -61,14 +59,21 @@ class Daemon: self.sqldb.commit() self.controllers = controller.Controller.from_db(self.sqldb) - logging.debug(self.controllers) + log.debug(self.controllers) - server = self.SocketServer(self.config.get(self.daemonSection, 'host', fallback='0.0.0.0'), - self.config.get(self.daemonSection, 'port', fallback=1425)) - asyncore.loop() + logging.getLogger("asyncio").setLevel(logging.DEBUG) + + self.loop = asyncio.get_event_loop() + self.loop.set_debug(True) + coro = self.loop.create_server(LedDProtocol, + self.config.get(self.daemonSection, 'host', fallback='0.0.0.0'), + self.config.get(self.daemonSection, 'port', fallback=1425)) + self.loop.run_until_complete(coro) + self.loop.run_forever() except (KeyboardInterrupt, SystemExit): - logging.info("Exiting") + log.info("Exiting") self.sqldb.close() + self.loop.close() sys.exit(0) def check_db(self): @@ -84,7 +89,7 @@ class Daemon: c.close() if db_version is not None: - logging.info("DB connection established; db-version=%s", db_version[0]) + log.info("DB connection established; db-version=%s", db_version[0]) return True else: return False @@ -112,7 +117,7 @@ class Daemon: :param req_json: dict of request json """ # TODO: add adapter setting stripe with color here - logging.debug("recieved action: %s", req_json['action']) + log.debug("recieved action: %s", req_json['action']) @add_action(action_dict) def add_controller(self, req_json): @@ -122,12 +127,12 @@ class Daemon: address: hexdecimal address of controller on i2c bus, e.g. 0x40 :param req_json: dict of request json """ - logging.debug("recieved action: %s", req_json['action']) + log.debug("recieved action: %s", req_json['action']) try: ncontroller = controller.Controller(Daemon.instance.sqldb, req_json['channels'], req_json['i2c_dev'], req_json['address']) except OSError as e: - logging.error("Error opening i2c device: %s", req_json['i2c_dev']) + log.error("Error opening i2c device: %s", req_json['i2c_dev']) rjson = { 'success': False, 'message': "Error while opening i2c device", @@ -153,7 +158,7 @@ class Daemon: Required JSON parameters: stripeid: sid :param req_json: dict of request json """ - logging.debug("recieved action: %s", req_json['action']) + log.debug("recieved action: %s", req_json['action']) # TODO: Add get color logic @add_action(action_dict) @@ -163,11 +168,11 @@ class Daemon: Required JSON parameters: :param req_json: dict of request json """ - logging.debug("recieved action: %s", req_json['action']) + log.debug("recieved action: %s", req_json['action']) if "stripes" in req_json: for stripe in req_json['stripes']: # TODO: add stripe here - logging.debug(len(req_json['stripes'])) + log.debug(len(req_json['stripes'])) @add_action(action_dict) def get_controllers(self, req_json): @@ -176,7 +181,7 @@ class Daemon: Required JSON parameters: none :param req_json: dict of request json """ - logging.debug("recieved action: %s", req_json['action']) + log.debug("recieved action: %s", req_json['action']) rjson = { 'success': True, @@ -194,14 +199,14 @@ class Daemon: Required JSON parameters: controller id: cid :param req_json: dict of request json """ - logging.debug("recieved action: %s", req_json['action']) + log.debug("recieved action: %s", req_json['action']) result = next(filter(lambda x: x.id == req_json['cid'], self.controllers), None) """ :type : Controller """ if result is not None: for i in range(result.channels): - logging.debug("set channel %d=%s", i, "1") + log.debug("set channel %d=%s", i, "1") result.set_channel(i, 1) time.sleep(10) result.set_channel(i, 0) @@ -220,7 +225,7 @@ class Daemon: Required JSON parameters: none :param req_json: dict of request json """ - logging.debug("recieved action: %s", req_json['action']) + log.debug("recieved action: %s", req_json['action']) rjson = { 'success': True, @@ -230,56 +235,46 @@ class Daemon: return json.dumps(rjson) - class ConnectionHandler(asyncore.dispatcher_with_send): - def handle_read(self): - data = self.recv(5120) - self.debug = True - def no_action_found(self, req_json): - rjson = { - 'success': False, - 'message': "No action found", - 'ref': req_json['ref'] - } - return json.dumps(rjson) +class LedDProtocol(asyncio.Protocol): + transport = None - if data: - try: - json_decoded = json.loads(data.decode()) - logging.debug(json.dumps(json_decoded, sort_keys=True)) + def connection_made(self, transport): + log.info("New connection from %s", transport.get_extra_info("peername")) + self.transport = transport - if "action" in json_decoded and "ref" in json_decoded: - return_data = Daemon.instance.action_dict.get(json_decoded['action'], no_action_found)( - self=Daemon.instance, - req_json=json_decoded) + def data_received(self, data): + log.info("Received: %s", data.decode()) + self.select_task(data) - if return_data is not None: - self.send("{}\n".format(return_data).encode()) - else: - logging.warning("no action or ref value found in JSON, ignoring") - except TypeError: - logging.error("No valid JSON found: %s", traceback.format_exc()) - except ValueError: - logging.error("No valid JSON detected: %s", traceback.format_exc()) + def select_task(self, data): + if data: + try: + json_decoded = json.loads(data.decode()) - class SocketServer(asyncore.dispatcher): - def __init__(self, host, port): - asyncore.dispatcher.__init__(self) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.set_reuse_addr() - self.bind((host, port)) - self.listen(5) + if "action" in json_decoded and "ref" in json_decoded: + return_data = Daemon.instance.action_dict.get(json_decoded['action'], self.no_action_found)( + json_decoded) - p = Process(target=self.run_tests) - p.start() + if return_data is not None: + self.transport.write("{}\n".format(return_data).encode()) + else: + log.debug("no action or ref value found in JSON, ignoring") + except TypeError: + log.debug("No valid JSON found: %s", traceback.format_exc()) + except ValueError: + log.debug("No valid JSON detected: %s", traceback.format_exc()) - @staticmethod - def run_tests(): - nose.run() + @staticmethod + def no_action_found(req_json): + rjson = { + 'success': False, + 'message': "No action found", + 'ref': req_json['ref'] + } + return json.dumps(rjson) - def handle_accept(self): - pair = self.accept() - if pair is not None: - sock, addr = pair - logging.debug('Incoming connection from %s' % repr(addr)) - handler = Daemon.ConnectionHandler(sock) + def connection_lost(self, exc): + # The socket has been closed, stop the event loop + # Daemon.loop.stop() + log.info("Lost connection to %s", self.transport.get_extra_info("peername")) diff --git a/start.py b/start.py index bca150a..c12eb3c 100644 --- a/start.py +++ b/start.py @@ -34,21 +34,15 @@ if "smbus" not in (name for loader, name, ispkg in iter_modules()): def read_word_data(self, cmd): return self.channels[(cmd - 8) / 4] + import sys sys.modules['smbus'] = SMBus import ledd.daemon if __name__ == "__main__": - log = logging.getLogger("") - formatter = logging.Formatter("%(asctime)s %(levelname)s " + - "[%(module)s:%(lineno)d] %(message)s") - # setup console logging - log.setLevel(logging.DEBUG) - ch = logging.StreamHandler() - ch.setLevel(logging.DEBUG) - - ch.setFormatter(formatter) - log.addHandler(ch) - + logging.basicConfig(level=logging.DEBUG, + format="[%(asctime)s] %(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s", + datefmt="%H:%M:%S") + log = logging.getLogger(__name__) daemon = ledd.daemon.Daemon()