added command line options

added reset and pwm frequency methods
This commit is contained in:
Giovanni Harting
2015-09-12 20:48:43 +02:00
parent d6f198201d
commit 1623bba1fe
6 changed files with 223 additions and 91 deletions

129
ledd.py Normal file
View File

@@ -0,0 +1,129 @@
# LEDD Project
# Copyright (C) 2015 LEDD Team
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""LedD Daemon
Usage:
ledd.py [--daemon] [-d | --debug] [-v | --verbose]
ledd.py -h | --help
ledd.py --version
Options:
-h --help Show this screen.
--version Show version.
-d --debug Show debug output. (not recommended for daily use)
-v --verbose Be verbose.
--daemon Run in daemon mode.
"""
import logging
import sys
import os
from pkgutil import iter_modules
from docopt import docopt
import ledd.daemon
import ledd
if "smbus" not in (name for loader, name, ispkg in iter_modules()):
print("smbus not found, installing replacement")
class SMBus:
def __init__(self, i2c_address):
self.i2c_address = i2c_address
self.channels = {}
def write_word_data(self, addr, cmd, val):
if (cmd - 6) % 4 == 0:
self.channels[(cmd - 6) // 4] = val
def read_word_data(self, addr, cmd):
if (cmd - 8) // 4 not in self.channels:
self.channels[(cmd - 8) // 4] = 0
return self.channels[(cmd - 8) // 4]
class SMBusModule:
SMBus = SMBus
sys.modules['smbus'] = SMBusModule
sys.modules['smbus'].SMBus = SMBus
def pid_exists(processid):
if processid < 0:
return False
try:
os.kill(processid, 0)
except ProcessLookupError:
return False
except PermissionError:
return True
else:
return True
if __name__ == "__main__":
arguments = docopt(__doc__, version='LedD Daemon ' + ledd.VERSION)
lvl = logging.WARNING
if arguments['--verbose']:
lvl = logging.INFO
if arguments['--debug']:
lvl = logging.DEBUG
logging.basicConfig(level=lvl,
format="[%(asctime)s] %(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s",
datefmt="%H:%M:%S")
log = logging.getLogger(__name__)
try:
with open('ledd.pid', 'r') as f:
spid = f.read()
if spid:
if pid_exists(int(spid)):
log.fatal("A instance of the program is already running, exiting...")
sys.exit(5)
else:
log.warning("Found stale pid file, assuming unclean shutdown.")
except FileNotFoundError:
pass
if arguments['--daemon']:
wdir = os.path.dirname(os.path.realpath(__file__))
try:
pid = os.fork()
if pid == 0:
os.setsid()
pid2 = os.fork()
if pid2 == 0:
os.umask(0)
os.chdir(wdir)
with open("ledd.pid", 'w') as pidf:
pidf.write(str(os.getpid()) + '\n')
daemon = ledd.daemon.Daemon()
else:
sys.exit()
else:
sys.exit()
except OSError as e:
log.fatal("Start failed: %s", e)
else:
daemon = ledd.daemon.Daemon()

View File

@@ -15,6 +15,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from json import JSONEncoder
import logging
import time
import smbus
@@ -27,7 +29,6 @@ PCA9685_SUBADR3 = 0x4
PCA9685_MODE1 = 0x00
PCA9685_MODE2 = 0x01
PCA9685_PRESCALE = 0xFE
PCA9685_RESET = 0xFE
LED0_ON_L = 0x06
LED0_ON_H = 0x07
@@ -56,7 +57,8 @@ class Controller:
l = []
cur = db.cursor()
for row in cur.execute("select * from controller"):
l.append(Controller.from_row(db, row))
c = Controller.from_row(db, row)
l.append(c)
cur.close()
return l
@@ -64,24 +66,27 @@ class Controller:
cur = self.db.cursor()
if self.id == -1:
cur.execute("INSERT INTO controller (pwm_freq, channels, i2c_device, address) VALUES (?,?,?,?)",
(self.pwm_freq, self.channels, self.i2c_device, self.address))
(self._pwm_freq, self.channels, self.i2c_device, self.address))
self.id = cur.lastrowid
else:
cur.execute("UPDATE controller SET pwm_freq=?, channels=?, i2c_device=?, address=? WHERE id = ?",
(self.pwm_freq, self.channels, self.i2c_device, self.address, self.id))
(self._pwm_freq, self.channels, self.i2c_device, self.address, self.id))
cur.close()
self.db.commit()
def __init__(self, db, channels, i2c_device, address, pwm_freq=-1, cid=-1, from_db=False):
self.pwm_freq = pwm_freq
def __init__(self, db, channels, i2c_device, address, pwm_freq=1526, cid=-1, from_db=False):
self._mode = None
self.channels = channels
self.i2c_device = i2c_device
self.bus = smbus.SMBus(i2c_device)
self.address = address
self._address = int(address, 16)
self.id = cid
self.db = db
self.stripes = []
self.load_stripes()
self._pwm_freq = None
self.pwm_freq = pwm_freq
if not from_db:
self.save_to_db()
@@ -89,21 +94,56 @@ class Controller:
cur = self.db.cursor()
for stripe in cur.execute("select * from stripes where controller_id = ?", (self.id,)):
self.stripes.append(Stripe.from_db(self, stripe))
logging.getLogger(__name__).debug("Loaded %s stripes for controller %s", len(self.stripes), self.id)
cur.close()
def __repr__(self):
return "<Controller stripes={} cid={}>".format(len(self.stripes), self.id)
def set_channel(self, channel, val):
self.bus.write_word_data(int(self.address, 16), LED0_OFF_L + 4 * channel, int(val * 4095))
self.bus.write_word_data(int(self.address, 16), LED0_ON_L + 4 * channel, 0)
self.bus.write_word_data(self._address, LED0_OFF_L + 4 * channel, int(val * 4095))
self.bus.write_word_data(self._address, LED0_ON_L + 4 * channel, 0)
def get_channel(self, channel):
return self.bus.read_word_data(int(self.address, 16), LED0_OFF_L + 4 * channel) / 4095
return self.bus.read_word_data(self._address, LED0_OFF_L + 4 * channel) / 4095
def add_stripe(self, stripe):
self.stripes.append(stripe)
def reset(self):
self.mode = int("0b00100001", 2) # MODE1 -> 0b00000001
time.sleep(0.015)
self.mode = int("0b10100001", 2)
@property
def mode(self):
self._mode = self.bus.read_byte_data(self._address, PCA9685_MODE1)
logging.getLogger(__name__).debug("Controller mode: %s", bin(self._mode))
return self._mode
@mode.setter
def mode(self, mode):
self.bus.write_byte_data(self._address, PCA9685_MODE1, mode)
self._mode = mode
logging.getLogger(__name__).debug("Controller mode: %s", bin(self._mode))
@property
def pwm_freq(self):
self._pwm_freq = (self.bus.read_byte_data(self._address, PCA9685_PRESCALE) + 1) / 4096 * 25000000
return self._pwm_freq
@pwm_freq.setter
def pwm_freq(self, value):
if value < 24 or value > 1526:
raise ValueError("PWM frequency must be 24Hz <= pwm_freq <= 1526Hz: {}".format(value))
prescal = round((25000000.0 / (4096.0 * value))) - 1
logging.getLogger(__name__).debug("Presacle value: %s", prescal)
self.mode = int("0b00110001", 2)
self.bus.write_byte_data(self._address, PCA9685_PRESCALE, prescal)
self.reset()
self._pwm_freq = value
class ControllerEncoder(JSONEncoder):
def default(self, o):
@@ -114,5 +154,14 @@ class ControllerEncoder(JSONEncoder):
'channel': o.channels,
'address': o.address,
'stripes': o.stripes,
'i2c_device': o.i2c_device
'cstripes': len(o.stripes),
'i2c_device': o.i2c_device,
'mode': o.mode
}
elif isinstance(o, Stripe):
return {
'id': o.id,
'name': o.name,
'rgb': o.rgb,
'channel': o.channels
}

View File

@@ -23,6 +23,8 @@ import sys
import traceback
import time
import asyncio
import signal
import spectra
from ledd import controller, VERSION
@@ -54,6 +56,7 @@ class Daemon:
self.config.read_file(f)
except FileNotFoundError:
log.info("No config file found!")
pass
# SQL init
self.sqldb = sqlite3.connect(self.config.get(self.databaseSection, 'name', fallback='ledd.sqlite'))
@@ -67,7 +70,13 @@ class Daemon:
# init controllers from db
self.controllers = controller.Controller.from_db(self.sqldb)
log.debug(self.controllers)
logging.getLogger("asyncio").setLevel(logging.DEBUG)
logging.getLogger("asyncio").setLevel(log.getEffectiveLevel())
# sigterm handler
def sigterm_handler(_signo, _stack_frame):
sys.exit(0)
signal.signal(signal.SIGTERM, sigterm_handler)
# main loop
self.loop = asyncio.get_event_loop()
@@ -78,6 +87,10 @@ class Daemon:
self.loop.run_forever()
except (KeyboardInterrupt, SystemExit):
log.info("Exiting")
try:
os.remove("ledd.pid")
except FileNotFoundError:
pass
self.sqldb.close()
self.server.close()
self.loop.run_until_complete(self.server.wait_closed())
@@ -208,7 +221,7 @@ class Daemon:
def find_stripe(self, sid):
"""
Finds a given stripeid in the currently known controllers
:param jstripe: json containing sid
:param sid stripe id
:return: stripe if found or none
:rtype: ledd.Stripe | None
"""
@@ -298,6 +311,7 @@ class Daemon:
if "stripes" in req_json:
for stripe in req_json['stripes']:
c = next((x for x in self.controllers if x.id == stripe['cid']), None)
""" :type c: ledd.controller.Controller """
if c is None:
res_stripes.append({
@@ -310,6 +324,9 @@ class Daemon:
s = Stripe(c, stripe['name'], stripe['rgb'],
(stripe['map']['r'], stripe['map']['g'], stripe['map']['b']))
c.stripes.append(s)
log.debug("Added stripe %s to controller %s; new len %s", c.id, s.id, len(c.stripes))
res_stripes.append({
'success': True,
'sid': s.id,
@@ -325,9 +342,9 @@ class Daemon:
return json.dumps(rjson)
@ledd_protocol(protocol)
def get_controllers(self, req_json):
def get_stripes(self, req_json):
"""
Part of the Color API. Used to get all registered controllers known to the daemon.
Part of the Color API. Used to get all registered stripes known to the daemon.
Required JSON parameters: none
:param req_json: dict of request json
"""
@@ -398,17 +415,22 @@ class LedDProtocol(asyncio.Protocol):
transport = None
def connection_made(self, transport):
log.info("New connection from %s", transport.get_extra_info("peername"))
log.debug("New connection from %s", transport.get_extra_info("peername"))
self.transport = transport
def data_received(self, data):
log.info("Received: %s from: %s", data.decode(), self.transport.get_extra_info("peername"))
self.select_task(data)
try:
d_decoded = data.decode()
except UnicodeDecodeError:
log.warning("Recieved undecodable data, ignoring")
else:
log.debug("Received: %s from: %s", d_decoded, self.transport.get_extra_info("peername"))
self.select_task(d_decoded)
def select_task(self, data):
if data:
try:
json_decoded = json.loads(data.decode())
json_decoded = json.loads(data)
if "action" in json_decoded and "ref" in json_decoded:
return_data = Daemon.instance.protocol.get(json_decoded['action'], Daemon.no_action_found)(

View File

@@ -49,6 +49,9 @@ class Stripe:
c = Color("rgb", rc[0], rc[1], rc[2])
self._color = c.to("hsv")
def __repr__(self):
return "<Stripe id={}>".format(self.id)
@classmethod
def from_db(cls, controller, row):
return cls(controller, name=row["name"], rgb=row["rgb"],

View File

@@ -21,10 +21,10 @@ setup(name='LedD',
description='Providing control for led stripes.',
url='https://github.com/LED-Freaks/LedD',
author='IdleGandalf, Lauch',
author_email='539@idlegandalf.com',
author_email='ledd@idlegandalf.com',
license='GPLv3',
packages=['ledd'],
install_requires=[
'nose', 'spectra',
'nose', 'spectra', 'docopt',
],
zip_safe=False)

View File

@@ -1,71 +0,0 @@
# LEDD Project
# Copyright (C) 2015 LEDD Team
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import ledd.daemon
import sys
import os
from pkgutil import iter_modules
if "smbus" not in (name for loader, name, ispkg in iter_modules()):
print("smbus not found, installing replacement")
class SMBus:
def __init__(self, i2c_address):
self.i2c_address = i2c_address
self.channels = {}
def write_word_data(self, addr, cmd, val):
if (cmd - 6) % 4 == 0:
self.channels[(cmd - 6) // 4] = val
def read_word_data(self, addr, cmd):
if (cmd - 8) // 4 not in self.channels:
self.channels[(cmd - 8) // 4] = 0
return self.channels[(cmd - 8) // 4]
class SMBusModule:
SMBus = SMBus
sys.modules['smbus'] = SMBusModule
sys.modules['smbus'].SMBus = SMBus
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG,
format="[%(asctime)s] %(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s",
datefmt="%H:%M:%S")
log = logging.getLogger(__name__)
wdir = os.path.dirname(os.path.realpath(__file__))
try:
pid = os.fork()
if pid == 0:
os.setsid()
pid2 = os.fork()
if pid2 == 0:
os.umask(0)
os.chdir(wdir)
daemon = ledd.daemon.Daemon()
else:
sys.exit()
else:
sys.exit()
except OSError as e:
log.fatal("Start failed: %s", e)