Files
mumo3/mumo_manager.py
2011-04-16 05:25:59 +02:00

588 lines
21 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8
# Copyright (C) 2010 Stefan Hacker <dd0t@users.sourceforge.net>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# - Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# - Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# - Neither the name of the Mumble Developers nor the names of its
# contributors may be used to endorse or promote products derived from this
# software without specific prior written permission.
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# `AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import Queue
from worker import Worker, local_thread, local_thread_blocking
from config import Config
import sys
import os
class FailedLoadModuleException(Exception):
pass
class FailedLoadModuleConfigException(FailedLoadModuleException):
pass
class FailedLoadModuleImportException(FailedLoadModuleException):
pass
class FailedLoadModuleInitializationException(FailedLoadModuleException):
pass
def debug_log(enable = True):
def new_dec(fu):
def new_fu(*args, **kwargs):
self = args[0]
log = self.log()
skwargs = ','.join(['%s=%s' % (karg,repr(arg)) for karg, arg in kwargs])
sargs = ','.join([str(arg) for arg in args[1:]]) + '' if not skwargs else (',' + str(skwargs))
call = "%s(%s)" % (fu.__name__, sargs)
log.debug(call)
res = fu(*args, **kwargs)
log.debug("%s -> %s", call, repr(res))
return res
return new_fu if enable else fu
return new_dec
debug_me = True
class MumoManagerRemote(object):
"""
Manager object handed to MumoModules. This module
acts as a remote for the MumoModule with which it
can register/unregister to/from callbacks as well
as do other signaling to the master MumoManager.
"""
SERVERS_ALL = [-1] ## Applies to all servers
def __init__(self, master, name, queue):
self.__master = master
self.__name = name
self.__queue = queue
def getQueue(self):
return self.__queue
def subscribeMetaCallbacks(self, handler, servers = SERVERS_ALL):
"""
Subscribe to meta callbacks. Subscribes the given handler to the following
callbacks:
>>> started(self, server, context = None)
>>> stopped(self, server, context = None)
@param servers: List of server IDs for which to subscribe. To subscribe to all
servers pass SERVERS_ALL.
@param handler: Object on which to call the callback functions
"""
return self.__master.subscribeMetaCallbacks(self.__queue, handler, servers)
def unsubscribeMetaCallbacks(self, handler, servers = SERVERS_ALL):
"""
Unsubscribe from meta callbacks. Unsubscribes the given handler from callbacks
for the given servers.
@param servers: List of server IDs for which to unsubscribe. To unsubscribe from all
servers pass SERVERS_ALL.
@param handler: Subscribed handler
"""
return self.__master.unscubscribeMetaCallbacks(self.__queue, handler, servers)
def subscribeServerCallbacks(self, handler, servers = SERVERS_ALL):
"""
Subscribe to server callbacks. Subscribes the given handler to the following
callbacks:
>>> userConnected(self, state, context = None)
>>> userDisconnected(self, state, context = None)
>>> userStateChanged(self, state, context = None)
>>> channelCreated(self, state, context = None)
>>> channelRemoved(self, state, context = None)
>>> channelStateChanged(self, state, context = None)
@param servers: List of server IDs for which to subscribe. To subscribe to all
servers pass SERVERS_ALL.
@param handler: Object on which to call the callback functions
"""
return self.__master.subscribeServerCallbacks(self.__queue, handler, servers)
def unsubscribeServerCallbacks(self, handler, servers = SERVERS_ALL):
"""
Unsubscribe from server callbacks. Unsubscribes the given handler from callbacks
for the given servers.
@param servers: List of server IDs for which to unsubscribe. To unsubscribe from all
servers pass SERVERS_ALL.
@param handler: Subscribed handler
"""
return self.__master.unsubscribeServerCallbacks(self.__queue, handler, servers)
def subscribeContextCallbacks(self, handler, servers = SERVERS_ALL):
"""
Subscribe to context callbacks. Subscribes the given handler to the following
callbacks:
>>> contextAction(self, action, user, session, channelid, context = None)
@param servers: List of server IDs for which to subscribe. To subscribe to all
servers pass SERVERS_ALL.
@param handler: Object on which to call the callback functions
"""
return self.__master.subscribeContextCallbacks(self.__queue, handler, servers)
def unsubscribeContextCallbacks(self, handler, servers = SERVERS_ALL):
"""
Unsubscribe from context callbacks. Unsubscribes the given handler from callbacks
for the given servers.
@param servers: List of server IDs for which to unsubscribe. To unsubscribe from all
servers pass SERVERS_ALL.
@param handler: Subscribed handler
"""
return self.__master.unsubscribeContextCallbacks(self.__queue, handler, servers)
def getMurmurModule(self):
"""
Returns the Murmur module generated from the slice file
"""
return self.__master.getMurmurModule()
def getMeta(self):
"""
Returns the connected servers meta module or None if it is not available
"""
return self.__master.getMeta()
class MumoManager(Worker):
MAGIC_ALL = -1
cfg_default = {'modules':(('mod_dir', str, "modules/"),
('cfg_dir', str, "modules-enabled/"),
('timeout', int, 2))}
def __init__(self, murmur, cfg = Config(default = cfg_default)):
Worker.__init__(self, "MumoManager")
self.queues = {} # {queue:module}
self.modules = {} # {name:module}
self.imports = {} # {name:import}
self.cfg = cfg
self.murmur = murmur
self.meta = None
self.metaCallbacks = {} # {sid:{queue:[handler]}}
self.serverCallbacks = {}
self.contextCallbacks = {}
def __add_to_dict(self, mdict, queue, handler, servers):
for server in servers:
if server in mdict:
if queue in mdict[server]:
if not handler in mdict[server][queue]:
mdict[server][queue].append(handler)
else:
mdict[server][queue] = [handler]
else:
mdict[server] = {queue:[handler]}
def __rem_from_dict(self, mdict, queue, handler, servers):
for server in servers:
try:
mdict[server][queue].remove(handler)
except KeyError, ValueError:
pass
def __announce_to_dict(self, mdict, server, function, *args, **kwargs):
"""
Call function on handlers for specific servers in one of our handler
dictionaries.
@param mdict Dictionary to announce to
@param server Server to announce to, ALL is always implied
@param function Function the handler should call
@param args Arguments for the function
@param kwargs Keyword arguments for the function
"""
# Announce to all handlers of the given serverlist
if server == self.MAGIC_ALL:
servers = mdict.iterkeys()
else:
servers = [self.MAGIC_ALL, server]
for server in servers:
try:
for queue, handlers in mdict[server].iteritems():
for handler in handlers:
self.__call_remote(queue, handler, function, *args, **kwargs)
except KeyError:
# No handler registered for that server
pass
def __call_remote(self, queue, handler, function, *args, **kwargs):
try:
func = getattr(handler, function) # Find out what to call on target
queue.put((None, func, args, kwargs))
except AttributeError, e:
mod = self.queues.get(queue, None)
myname = ""
for name, mymod in self.modules.iteritems():
if mod == mymod:
myname = name
if myname:
self.log().error("Handler class registered by module '%s' does not handle function '%s'. Call failed.", myname, function)
else:
self.log().exception(e)
#
#-- Module multiplexing functionality
#
@local_thread
def announceConnected(self, meta = None):
"""
Call connected handler on all handlers
"""
self.meta = meta
for queue, module in self.queues.iteritems():
self.__call_remote(queue, module, "connected")
@local_thread
def announceDisconnected(self):
"""
Call disconnected handler on all handlers
"""
for queue, module in self.queues.iteritems():
self.__call_remote(queue, module, "disconnected")
@local_thread
def announceMeta(self, server, function, *args, **kwargs):
"""
Call a function on the meta handlers
@param server Server to announce to
@param function Name of the function to call on the handler
@param args List of arguments
@param kwargs List of keyword arguments
"""
self.__announce_to_dict(self.metaCallbacks, server, function, *args, **kwargs)
@local_thread
def announceServer(self, server, function, *args, **kwargs):
"""
Call a function on the server handlers
@param server Server to announce to
@param function Name of the function to call on the handler
@param args List of arguments
@param kwargs List of keyword arguments
"""
self.__announce_to_dict(self.serverCallbacks, server, function, *args, **kwargs)
@local_thread
def announceContext(self, server, function, *args, **kwargs):
"""
Call a function on the context handlers
@param server Server to announce to
@param function Name of the function to call on the handler
@param args List of arguments
@param kwargs List of keyword arguments
"""
self.__announce_to_dict(self.serverCallbacks, server, function, *args, **kwargs)
#
#--- Module self management functionality
#
@local_thread
def subscribeMetaCallbacks(self, queue, handler, servers):
"""
@param queue Target worker queue
@see MumoManagerRemote
"""
return self.__add_to_dict(self.metaCallbacks, queue, handler, servers)
@local_thread
def unsubscribeMetaCallbacks(self, queue, handler, servers):
"""
@param queue Target worker queue
@see MumoManagerRemote
"""
return self.__rem_from_dict(self.metaCallbacks, queue, handler, servers)
@local_thread
def subscribeServerCallbacks(self, queue, handler, servers):
"""
@param queue Target worker queue
@see MumoManagerRemote
"""
return self.__add_to_dict(self.serverCallbacks, queue, handler, servers)
@local_thread
def unsubscribeServerCallbacks(self, queue, handler, servers):
"""
@param queue Target worker queue
@see MumoManagerRemote
"""
return self.__rem_from_dict(self.serverCallbacks, queue, handler, servers)
@local_thread
def subscribeContextCallbacks(self, queue, handler, servers):
"""
@param queue Target worker queue
@see MumoManagerRemote
"""
#TODO: Implement context callbacks
self.log().error("Context callbacks not implemented at this point")
return self.__add_to_dict(self.contextCallbacks, queue, handler, servers)
@local_thread
def unsubscribeContextCallbacks(self, queue, handler, servers):
"""
@param queue Target worker queue
@see MumoManagerRemote
"""
return self.__rem_from_dict(self.contextCallbacks, queue, handler, servers)
def getMurmurModule(self):
"""
Returns the Murmur module generated from the slice file
"""
return self.murmur
def getMeta(self):
"""
Returns the connected servers meta module or None if it is not available
"""
return self.meta
#--- Module load/start/stop/unload functionality
#
@local_thread_blocking
@debug_log(debug_me)
def loadModules(self, names = None):
"""
Loads a list of modules from the mumo directory structure by name.
@param names List of names of modules to load
@return: List of modules loaded
"""
loadedmodules = {}
if not names:
# If no names are given load all modules that have a configuration in the cfg_dir
if not os.path.isdir(self.cfg.modules.cfg_dir):
msg = "Module configuration directory '%s' not found" % self.cfg.modules.cfg_dir
self.log().error(msg)
raise FailedLoadModuleImportException(msg)
names = []
for f in os.listdir(self.cfg.modules.cfg_dir):
if os.path.isfile(self.cfg.modules.cfg_dir + f):
base, ext = os.path.splitext(f)
if not ext or ext.lower() == ".ini" or ext.lower() == ".conf":
names.append(base)
for name in names:
try:
modinst = self._loadModule_noblock(name)
loadedmodules[name] = modinst
except FailedLoadModuleException:
pass
return loadedmodules
@local_thread_blocking
def loadModuleCls(self, name, modcls, module_cfg = None):
return self._loadModuleCls_noblock(name, modcls, module_cfg)
@debug_log(debug_me)
def _loadModuleCls_noblock(self, name, modcls, module_cfg = None):
log = self.log()
if name in self.modules:
log.error("Module '%s' already loaded", name)
return
modqueue = Queue.Queue()
modmanager = MumoManagerRemote(self, name, modqueue)
try:
modinst = modcls(name, modmanager, module_cfg)
except Exception, e:
msg = "Module '%s' failed to initialize" % name
log.error(msg)
log.exception(e)
raise FailedLoadModuleInitializationException(msg)
# Remember it
self.modules[name] = modinst
self.queues[modqueue] = modinst
return modinst
@local_thread_blocking
def loadModule(self, name):
"""
Loads a single module either by name
@param name Name of the module to load
@return Module instance
"""
self._loadModule_noblock(name)
@debug_log(debug_me)
def _loadModule_noblock(self, name):
# Make sure this module is not already loaded
log = self.log()
log.debug("loadModuleByName('%s')", name)
if name in self.modules:
log.warning("Tried to load already loaded module %s", name)
return
# Check whether there is a configuration file for this module
confpath = self.cfg.modules.cfg_dir + name + '.ini'
if not os.path.isfile(confpath):
msg = "Module configuration file '%s' not found" % confpath
log.error(msg)
raise FailedLoadModuleConfigException(msg)
# Make sure the module directory is in our python path and exists
if not self.cfg.modules.mod_dir in sys.path:
if not os.path.isdir(self.cfg.modules.mod_dir):
msg = "Module directory '%s' not found" % self.cfg.modules.mod_dir
log.error(msg)
raise FailedLoadModuleImportException(msg)
sys.path.insert(0, self.cfg.modules.mod_dir)
# Import the module and instanciate it
try:
mod = __import__(name)
self.imports[name] = mod
except ImportError, e:
msg = "Failed to import module '%s', reason: %s" % (name, str(e))
log.error(msg)
raise FailedLoadModuleImportException(msg)
try:
try:
modcls = mod.mumo_module_class # First check if there's a magic mumo_module_class variable
log.debug("Magic mumo_module_class found")
except AttributeError:
modcls = getattr(mod, name)
except AttributeError:
msg = "Module does not contain required class '%s'" % name
log.error(msg)
raise FailedLoadModuleInitializationException(msg)
return self._loadModuleCls_noblock(name, modcls, confpath)
@local_thread_blocking
@debug_log(debug_me)
def startModules(self, names = None):
"""
Start a module by name
@param names List of names of modules to start
@return A dict of started module names and instances
"""
log = self.log()
startedmodules = {}
if not names:
# If no names are given start all models
names = self.modules.iterkeys()
for name in names:
try:
modinst = self.modules[name]
if not modinst.isAlive():
modinst.start()
log.debug("Module '%s' started", name)
else:
log.debug("Module '%s' already running", name)
startedmodules[name] = modinst
except KeyError:
log.error("Could not start unknown module '%s'", name)
return startedmodules
@local_thread_blocking
@debug_log(debug_me)
def stopModules(self, names = None, force = False):
"""
Stop a list of modules by name. Note that this only works
for well behaved modules. At this point if a module is really going
rampant you will have to restart mumo.
@param names List of names of modules to unload
@param force Unload the module asap dropping messages queued for it
@return A dict of stopped module names and instances
"""
log = self.log()
stoppedmodules = {}
if not names:
# If no names are given start all models
names = self.modules.iterkeys()
for name in names:
try:
modinst = self.modules[name]
stoppedmodules[name] = modinst
except KeyError:
log.warning("Asked to stop unknown module '%s'", name)
continue
if force:
# We will have to drain the modules queues
for queue, module in self.queues.iteritems():
if module in self.modules:
try:
while queue.get_nowait(): pass
except Queue.Empty: pass
for modinst in stoppedmodules.itervalues():
if modinst.isAlive():
modinst.stop()
log.debug("Module '%s' is being stopped", name)
else:
log.debug("Module '%s' already stopped", name)
for modinst in stoppedmodules.itervalues():
modinst.join(timeout = self.cfg.modules.timeout)
return stoppedmodules
def stop(self, force = True):
self.log().debug("Stopping")
self.stopModules()
Worker.stop(self, force)