Basic functionality and test coverage for source plugin

This commit is contained in:
Stefan Hacker
2013-03-05 18:50:24 +01:00
parent 108bcda5f0
commit 23ce059c98
4 changed files with 475 additions and 138 deletions

View File

@@ -35,22 +35,40 @@ import sqlite3
class SourceDB(object):
def __init__(self, path = ":memory:"):
"""
Initialize the sqlite database in the given path. If no path
is given the database is created in memory.
"""
self.db = sqlite3.connect(path)
if self.db:
self.db.execute("CREATE TABLE IF NOT EXISTS source(sid INTEGER, cid INTEGER, game TEXT, server TEXT, team INTEGER)")
self.db.execute("VACUUM")
self.db.commit()
def close(self):
"""
Closes the database connection
"""
if self.db:
self.db.commit()
self.db.close()
self.db = None
def isOk(self):
""" True if the database is correctly initialized """
"""
True if the database is correctly initialized
"""
return self.db != None
def cidFor(self, sid, game, server = None, team = None):
"""
Returns the channel id for game specific channel. If only game
is passed the game root channel cid is returned. If additionally
server (and team) are passed the server (/team) channel cid is returned.
If no channel matching the arguments has been registered with the database
before None is returned.
"""
assert(sid != None and game != None)
assert(not (team != None and server == None))
@@ -58,11 +76,18 @@ class SourceDB(object):
return v[0] if v else None
def channelForCid(self, sid, cid):
"""
Returns a tuple of (sid, cid, game, server, team) for the given cid.
Returns None if the cid is unknown.
"""
assert(sid != None and cid != None)
return self.db.execute("SELECT sid, cid, game, server, team FROM source WHERE sid is ? and cid is ?", [sid, cid]).fetchone()
def channelFor(self, sid, game, server = None, team = None):
""" Returns matching channel as (sid, cid, game, server team) tuple """
"""
Returns matching channel as (sid, cid, game, server, team) tuple. Matching
behavior is the same as for cidFor()
"""
assert(sid != None and game != None)
assert(not (team != None and server == None))
@@ -70,6 +95,12 @@ class SourceDB(object):
return v
def channelsFor(self, sid, game, server = None, team = None):
"""
Returns matching channels as a list of (sid, cid, game, server, team) tuples.
If only the game is passed all server and team channels are matched.
This can be limited by passing server (and team).
Returns empty list if no matches are found.
"""
assert(sid != None and game != None)
assert(not (team != None and server == None))
@@ -77,6 +108,9 @@ class SourceDB(object):
return self.db.execute("SELECT sid, cid, game, server, team FROM source WHERE sid is ? and game is ?" + suffix, [sid, game] + params).fetchall()
def registerChannel(self, sid, cid, game, server = None, team = None):
"""
Register a given channel with the database.
"""
assert(sid != None and game != None)
assert(not (team != None and server == None))
@@ -100,6 +134,9 @@ class SourceDB(object):
return ("", [])
def unregisterChannel(self, sid, game, server = None, team = None):
"""
Unregister a channel previously registered with the database.
"""
assert(sid != None and game != None)
assert(not (team != None and server == None))
@@ -108,21 +145,29 @@ class SourceDB(object):
self.db.commit()
def dropChannel(self, sid, cid):
""" Drops channel with given sid + cid """
"""
Drops channel with given sid + cid
"""
self.db.execute("DELETE FROM source WHERE sid is ? and cid is ?", [sid, cid])
self.db.commit()
def isRegisteredChannel(self, sid, cid):
""" Returns true if a channel with given sid and cid is registered """
"""
Returns true if a channel with given sid and cid is registered
"""
res = self.db.execute("SELECT cid FROM source WHERE sid is ? and cid is ?", [sid, cid]).fetchone()
return res != None
def registeredChannels(self):
""" Returns channels as a list of (sid, cid, game, server team) tuples grouped by sid """
"""
Returns channels as a list of (sid, cid, game, server team) tuples grouped by sid
"""
return self.db.execute("SELECT sid, cid, game, server, team FROM source ORDER by sid").fetchall()
def reset(self):
""" Deletes everything in the database """
"""
Deletes everything in the database
"""
self.db.execute("DELETE FROM source")
self.db.commit()

View File

@@ -46,6 +46,11 @@ from users import (User, UserRegistry)
import re
class source(MumoModule):
"""
This class combines the basic mumble moderator callbacks with
server level callbacks for handling source game positional audio
context and identity information.
"""
default_game_config = (
('name', str, "%(game)s"),
('servername', str, "%(server)s"),
@@ -85,6 +90,10 @@ class source(MumoModule):
self.db.close()
def connected(self):
"""
Makes sure the the plugin is correctly configured once the connection
to the mumble server is (re-)established.
"""
cfg = self.cfg()
manager = self.manager()
log = self.log()
@@ -105,6 +114,10 @@ class source(MumoModule):
def validateChannelDB(self):
"""
Makes sure the plugins internal datatbase
matches the actual state of the servers.
"""
log = self.log()
log.debug("Validating channel database")
@@ -127,6 +140,9 @@ class source(MumoModule):
def disconnected(self): pass
def removeFromGroups(self, mumble_server, session, game, server, team):
"""
Removes the client from all relevant groups
"""
sid = mumble_server.id()
prefix = self.cfg().source.groupprefix
game_cid = self.db.cidFor(sid, game)
@@ -141,9 +157,13 @@ class source(MumoModule):
mumble_server.removeUserFromGroup(game_cid, session, group) # Team
def addToGroups(self, mumble_server, session, game, server, team):
"""
Adds the client to all relevant groups
"""
sid = mumble_server.id()
prefix = self.cfg().source.groupprefix
game_cid = self.db.cidFor(sid, game)
assert(game_cid != None)
group = prefix + game
mumble_server.addUserToGroup(game_cid, session, group) # Game
@@ -154,71 +174,116 @@ class source(MumoModule):
group += "_" + str(team)
mumble_server.addUserToGroup(game_cid, session, group) # Team
def transitionPresentUser(self, mumble_server, old, new, sid, user_new):
"""
Transitions a user that has been and is currently playing
"""
assert(new)
target_cid = self.getOrCreateTargetChannelFor(mumble_server, new)
if user_new:
self.dlog(sid, new.state, "User started playing: g/s/t %s/%s/%d", new.game, new.server, new.identity["team"])
self.addToGroups(mumble_server, new.state.session, new.game, new.server, new.identity["team"])
else:
assert old
self.dlog(sid, old.state, "User switched: g/s/t %s/%s/%d", new.game, new.server, new.identity["team"])
self.removeFromGroups(mumble_server, old.state.session, old.game, old.server, old.identity["team"])
self.addToGroups(mumble_server, new.state.session, new.game, new.server, new.identity["team"])
return self.moveUser(mumble_server, new, target_cid)
def transitionGoneUser(self, mumble_server, old, new, sid):
"""
Transitions a user that played but is no longer doing so now.
"""
assert(old)
self.users.remove(sid, old.state.session)
self.removeFromGroups(mumble_server, old.state.session, old.game, old.server, old.identity["team"])
if new:
bcid = self.cfg().source.basechannelid
self.dlog(sid, old.state, "User stopped playing. Moving to %d.", bcid)
self.moveUserToCid(mumble_server, new.state, bcid)
else:
self.dlog(sid, old.state, "User gone")
return True
def userLeftChannel(self, mumble_server, old, sid):
"""
User left channel. Make sure we check for vacancy it if the game it
belongs to is configured that way.
"""
chan = self.db.channelFor(sid, old.game, old.server, old.identity['team'])
if chan:
_, cid, game, _, _ = chan
if self.getGameConfig(game, "deleteifunused"):
self.deleteIfUnused(mumble_server, cid)
def userTransition(self, mumble_server, old, new):
"""
Handles the transition of the user between given old and new states.
If no old state is available (connect, starting to play, ...) old can be
None. If an old state is given it is assumed that it is valid.
If no new state is available (disconnect) new can be None. A new state
can be either valid (playing) or invalid (not or no longer playing).
Depending on the previous and the new state this function performs all
needed actions.
"""
sid = mumble_server.id()
assert(not old or old.valid())
relevant = old or (new and new.valid())
if not relevant:
# User that is not playing. We don't care about those.
return
user_new = not old and new
user_new = not old and new and new.valid()
user_gone = old and (not new or not new.valid())
if not user_gone:
assert(new)
moved = self.transitionPresentUser(mumble_server, old, new, sid, user_new)
if user_new:
self.dlog(sid, new.state, "User started playing: g/s/t %s/%s/%d", new.game, new.server, new.team)
self.addToGroups(mumble_server, new.session, new.game, new.server, new.team)
else:
assert(old);
self.dlog(sid, old.state, "User switched: g/s/t %s/%s/%d", new.game, new.server, new.team)
self.removeFromGroups(mumble_server, old.session, old.game, old.server, old.team)
self.addToGroups(mumble_server, new.session, new.game, new.server, new.team)
moved = self.moveUser(mumble_server, new)
else:
# User gone
assert(old)
self.users.remove(sid, old.state.session)
self.removeFromGroups(mumble_server, old.session, old.game, old.server, old.team)
moved = True
if new:
bcid = self.cfg().source.basechannelid
self.dlog(sid, old.state, "User stopped playing. Moving to %d.", bcid)
self.moveUserToCid(mumble_server, new.state, )
else:
self.dlog(sid, old.state, "User gone")
moved = self.transitionGoneUser(mumble_server, old, new, sid)
if moved and old:
# If moved from a valid game state perform channel use check
chan = self.db.channelFor(sid, old.game, old.server, old.identity['team'])
if chan:
_, cid, game, _, _ = chan
if self.gameCfg(game, "deleteifunused"):
self.deleteIfUnused(mumble_server, cid)
self.userLeftChannel(mumble_server, old, sid)
def getGameName(self, game):
return self.gameCfg(game, "name")
"""
Returns the unexpanded game specific game name template.
"""
return self.getGameConfig(game, "name")
def getServerName(self, game):
return self.gameCfg(game, "servername")
"""
Returns the unexpanded game specific server name template.
"""
return self.getGameConfig(game, "servername")
def getTeamName(self, game, index):
"""
Returns the game specific team name for the given team index.
If the index is invalid the stringified index is returned.
"""
try:
return self.gameCfg(game, "teams")[index]
return self.getGameConfig(game, "teams")[index]
except IndexError:
return str(index)
def setACLsForGameChannel(self, mumble_server, game_cid, game):
"""
Sets the appropriate ACLs for a game channel for the given cid.
"""
# Shorthands
ACL = self.murmur.ACL
EAT = self.murmur.PermissionEnter | self.murmur.PermissionTraverse # Enter And Traverse
@@ -233,20 +298,18 @@ class source(MumoModule):
userid = -1,
group = 'all',
deny = EAT | W | S),
ACL(applyHere = True, # Allow speak to players
applySubs = True,
userid = -1,
group = groupname,
allow = S),
ACL(applyHere = True, # Allow enter and traverse to players
applySubs = False,
userid = -1,
group = groupname,
allow = EAT | W)],
allow = EAT)],
[], True)
def setACLsForServerChannel(self, mumble_server, server_cid, game, server):
"""
Sets the appropriate ACLs for a server channel for the given cid.
"""
# Shorthands
ACL = self.murmur.ACL
EAT = self.murmur.PermissionEnter | self.murmur.PermissionTraverse # Enter And Traverse
@@ -256,25 +319,18 @@ class source(MumoModule):
groupname = '~' + self.cfg().source.groupprefix + game + "_" + server
mumble_server.setACL(server_cid,
[ACL(applyHere = True, # Deny everything
applySubs = True,
userid = -1,
group = 'all',
deny = EAT | W | S),
ACL(applyHere = True, # Allow speak to players
applySubs = True,
userid = -1,
group = groupname,
allow = S),
ACL(applyHere = True, # Allow enter and traverse to players
[ACL(applyHere = True, # Allow enter and traverse to players
applySubs = False,
userid = -1,
group = groupname,
allow = EAT | W)],
allow = EAT)],
[], True)
def setACLsForTeamChannel(self, mumble_server, team_cid, game, server, team):
"""
Sets the appropriate ACLs for a team channel for the given cid.
"""
# Shorthands
ACL = self.murmur.ACL
EAT = self.murmur.PermissionEnter | self.murmur.PermissionTraverse # Enter And Traverse
@@ -292,6 +348,11 @@ class source(MumoModule):
[], True)
def getOrCreateGameChannelFor(self, mumble_server, game, server, sid, cfg, log, namevars):
"""
Helper function for getting or creating only the game channel. Returns
the cid of the exisitng or created game channel.
"""
sid = mumble_server.id()
game_cid = self.db.cidFor(sid, game)
if game_cid == None:
game_channel_name = self.getGameName(game) % namevars
@@ -300,8 +361,8 @@ class source(MumoModule):
self.db.registerChannel(sid, game_cid, game) # Make sure we don't have orphaned server channels around
self.db.unregisterChannel(sid, game, server)
if cfg.source.restrict:
log.debug("(%d) Setting ACL's for new game channel (cid %d)", game_cid)
if self.getGameConfig(game, "restrict"):
log.debug("(%d) Setting ACL's for new game channel (cid %d)", sid, game_cid)
self.setACLsForGameChannel(mumble_server, game_cid, game)
log.debug("(%d) Game channel created and registered (cid %d)", sid, game_cid)
@@ -309,6 +370,11 @@ class source(MumoModule):
def getOrCreateServerChannelFor(self, mumble_server, game, server, team, sid, log, namevars, game_cid):
"""
Helper function for getting or creating only the server channel. The game
channel must already exist. Returns the cid of the existing or created
server channel.
"""
server_cid = self.db.cidFor(sid, game, server)
if server_cid == None:
server_channel_name = self.getServerName(game) % namevars
@@ -317,8 +383,8 @@ class source(MumoModule):
self.db.registerChannel(sid, server_cid, game, server)
self.db.unregisterChannel(sid, game, server, team) # Make sure we don't have orphaned team channels around
if self.cfg().source.restrict:
log.debug("(%d) Setting ACL's for new server channel (cid %d)", server_cid)
if self.getGameConfig(game, "restrict"):
log.debug("(%d) Setting ACL's for new server channel (cid %d)", sid, server_cid)
self.setACLsForServerChannel(mumble_server, server_cid, game, server)
log.debug("(%d) Server channel created and registered (cid %d)", sid, server_cid)
@@ -326,6 +392,12 @@ class source(MumoModule):
def getOrCreateTeamChannelFor(self, mumble_server, game, server, team, sid, log, server_cid):
"""
Helper function for getting or creating only the team channel. Game and
server channel must already exist. Returns the cid of the existing or
created team channel.
"""
team_cid = self.db.cidFor(sid, game, server, team)
if team_cid == None:
team_channel_name = self.getTeamName(game, team)
@@ -333,14 +405,19 @@ class source(MumoModule):
team_cid = mumble_server.addChannel(team_channel_name, server_cid)
self.db.registerChannel(sid, team_cid, game, server, team)
if self.cfg().source.restrict:
log.debug("(%d) Setting ACL's for new team channel (cid %d)", team_cid)
if self.getGameConfig(game, "restrict"):
log.debug("(%d) Setting ACL's for new team channel (cid %d)", sid, team_cid)
self.setACLsForTeamChannel(mumble_server, team_cid, game, server, team)
log.debug("(%d) Team channel created and registered (cid %d)", sid, team_cid)
return team_cid
def getOrCreateChannelFor(self, mumble_server, game, server, team):
"""
Checks whether a requested team channel already exists. If not
all missing parts of the channel structure are created. Returns
the cid of the existing or created team channel.
"""
sid = mumble_server.id()
cfg = self.cfg()
log = self.log()
@@ -355,11 +432,32 @@ class source(MumoModule):
return team_cid
def moveUserToCid(self, server, state, cid):
"""
Low level helper for moving a user to a channel known by its ID
"""
self.dlog(server.id(), state, "Moving from channel %d to %d", state.channel, cid)
state.channel = cid
server.setState(state)
def moveUser(self, mumble_server, user):
def getOrCreateTargetChannelFor(self, mumble_server, user):
"""
Returns the cid of the target channel for this user. If needed
missing channels will be created.
"""
return self.getOrCreateChannelFor(mumble_server,
user.game,
user.server,
user.identity["team"])
def moveUser(self, mumble_server, user, target_cid = None):
"""
Move user according to current game state.
This function performs all tasks of the move including creating
channels if needed or deleting unused ones when appropriate.
If a target_cid is given it is assumed that the channel
structure is already present.
"""
state = user.state
game = user.game
server = user.server
@@ -367,7 +465,9 @@ class source(MumoModule):
sid = mumble_server.id()
source_cid = state.channel
target_cid = self.getOrCreateChannelFor(mumble_server, game, server, team)
if target_cid == None:
target_cid = self.getOrCreateChannelFor(mumble_server, game, server, team)
if source_cid != target_cid:
self.moveUserToCid(mumble_server, state, target_cid)
@@ -420,11 +520,11 @@ class source(MumoModule):
mumble_server.removeChannel(server_channel_cid)
return True
def validGameType(self, game):
def isValidGameType(self, game):
return self.cfg().source.gameregex.match(game) != None
def validServer(self, game, server):
return self.gameCfg(game, "serverregex").match(server) != None
def isValidServer(self, game, server):
return self.getGameConfig(game, "serverregex").match(server) != None
def parseSourceContext(self, context):
"""
@@ -440,7 +540,7 @@ class source(MumoModule):
# Not a source engine context
return (None, None)
if not self.validGameType(game) or not self.validServer(game, server):
if not self.isValidGameType(game) or not self.isValidServer(game, server):
return (None, None)
return (game, server)
@@ -466,8 +566,11 @@ class source(MumoModule):
except (AttributeError, ValueError):
return None
def gameCfg(self, game, variable):
"""Return the game specific value for the given variable if it exists. Otherwise the generic value"""
def getGameConfig(self, game, variable):
"""
Return the game specific value for the given variable if it exists. Otherwise the generic value
"""
sectionname = "game:" + game
cfg = self.cfg()
@@ -481,6 +584,11 @@ class source(MumoModule):
self.log().debug("(%d) (%d|%d) " + what, sid, state.session, state.userid, *argc)
def handle(self, server, new_state):
"""
Takes the updated state of the user and collects all
other required data to perform a state transition for
this user.
"""
sid = server.id()
session = new_state.session
@@ -510,18 +618,34 @@ class source(MumoModule):
#
def userDisconnected(self, server, state, context=None):
"""
Handle disconnect to be able to delete unused channels
and remove user from internal accounting.
"""
sid = server.id()
session = state.session
self.userTransition(server, self.users.get(sid, session), None)
def userStateChanged(self, server, state, context=None):
"""
Default state change for user. Could be something uninteresting for
the plugin like mute/unmute but or something relevant like the context
string change triggered by starting to play.
"""
self.handle(server, state)
def userConnected(self, server, state, context=None):
"""
First time we see the state for a user. userStateChanged behavior
applies.
"""
self.handle(server, state)
def channelRemoved(self, server, state, context=None):
"""
Updates internal accounting for channels controlled by the plugin.
"""
cid = state.id
sid = server.id()

View File

@@ -48,47 +48,100 @@ class StateMock():
class ServerMock():
def __init__(self, sid):
self.sid = sid
self.reset()
self._reset()
def id(self):
return self.sid
def lastChannelID(self):
def _lastChannelID(self):
return self.uid
def addChannel(self, name, parent):
self.name.append(name)
self.parent.append(parent)
self.uid += 1
assert(not self.uid in self.channels)
self.channels[self.uid] = {'name' : name,
'parent' : parent,
'groups' : {},
'acls' : [] }
return self.uid
def addUserToGroup(self, cid, session, group):
c = self._getChan(cid)
if session in c['groups']:
c['groups'][session].add(group)
else:
c['groups'][session] = set([group])
def _getChan(self, cid):
if not cid in self.channels:
raise InvalidChannelExceptionMock()
return self.channels[cid]
def getChannelState(self, cid):
if not cid in self.channel:
raise InvalidChannelExceptionMock()
return {'fake':True}
self._getChan(cid)
return {'fake': True}
def setState(self, state):
self.user_state.append(state)
def reset(self):
def setACL(self, cid, acls, groups, inherit):
c = self._getChan(cid)
c['acls'] = acls
def _reset(self):
self.uid = 1000
self.name = []
self.parent = []
self.channels = {} # See addChannel
self.user_state = []
class ACLMock(object):
def __init__(self, applyHere, applySubs, userid, group, deny = 0, allow = 0):
self.applyHere = applyHere
self.applySubs = applySubs
self.userid = userid
self.group = group
self.deny = deny
self.allow = allow
class MurmurMock():
class MurmurMock(object):
InvalidChannelException = InvalidChannelExceptionMock
ACL = ACLMock
PermissionEnter = 1
PermissionTraverse = 2
PermissionWhisper = 4
PermissionSpeak = 8
def _reset(self): pass
def __init__(self):
pass
class MockACLHelper(object):
E = MurmurMock.PermissionEnter
T = MurmurMock.PermissionTraverse
W = MurmurMock.PermissionWhisper
S = MurmurMock.PermissionSpeak
EAT = E | T
ALL = E|T|W|S
ACLS = MockACLHelper
class MetaMock():
def __init__(self):
self.s = ServerMock(1)
def getServer(self, sid):
assert(sid == self.s.id())
return self.s
def reset(self):
self.s.reset()
def _reset(self):
self.s._reset()
class ManagerMock():
SERVERS_ALL = [-1]
@@ -96,6 +149,7 @@ class ManagerMock():
def __init__(self):
self.q = Queue.Queue()
self.m = MurmurMock()
self.meta = MetaMock()
def getQueue(self):
return self.q
@@ -103,6 +157,9 @@ class ManagerMock():
def getMurmurModule(self):
return self.m
def getMeta(self):
return self.meta
def subscribeServerCallbacks(self, callback, servers):
self.serverCB = {'callback' : callback, 'servers' : servers}
@@ -113,7 +170,7 @@ class Test(unittest.TestCase):
def setUp(self):
self.mm = ManagerMock();
self.mserv = self.mm.m.getServer(1)
self.mserv = self.mm.meta.getServer(1)
testconfig = config.Config(None, source.source.default_config)
@@ -145,7 +202,8 @@ class Test(unittest.TestCase):
def resetState(self):
self.resetDB()
self.mm.m.reset()
self.mm.m._reset()
self.mm.meta._reset()
def tearDown(self):
self.s.disconnected()
@@ -169,8 +227,8 @@ class Test(unittest.TestCase):
self.assertEqual(self.s.cfg().source.basechannelid, 0)
self.assertEqual(self.s.cfg().generic.name, "%(game)s")
self.assertEqual(self.s.gameCfg("wugu", "name"), "%(game)s")
self.assertEqual(self.s.gameCfg("tf", "name"), "Team Fortress 2")
self.assertEqual(self.s.getGameConfig("wugu", "name"), "%(game)s")
self.assertEqual(self.s.getGameConfig("tf", "name"), "Team Fortress 2")
def testIdentityParser(self):
self.resetState()
@@ -225,22 +283,34 @@ class Test(unittest.TestCase):
actual = self.s.parseSourceContext("Source engine: tf\x00[A-1:2807761920(3281)]\x00")
self.assertEqual(none, actual)
def checkACLThings(self, acls, things):
self.assertEqual(len(things), len(acls))
i = 0
for thing in things:
acl = acls[i]
for attr, val in thing.iteritems():
self.assertEqual(getattr(acl, attr), val)
i += 1
def testGetOrCreateChannelFor(self):
mumble_server = self.mserv
prev = mumble_server.lastChannelID()
prev = mumble_server._lastChannelID()
game = "tf"; server = "[A-1:123]"; team = 3
cid = self.s.getOrCreateChannelFor(mumble_server, game, server, team)
self.assertEqual(3, cid - prev)
self.assertEqual(mumble_server.parent[0], 0)
self.assertEqual(mumble_server.parent[1], prev + 1)
self.assertEqual(mumble_server.parent[2], prev + 2)
c = mumble_server.channels
self.assertEqual(mumble_server.name[0], "Team Fortress 2")
self.assertEqual(mumble_server.name[1], "Test tf [A-1:123]")
self.assertEqual(mumble_server.name[2], "Red")
self.assertEqual(c[prev + 1]["parent"], 0)
self.assertEqual(c[prev + 2]["parent"], prev + 1)
self.assertEqual(c[prev + 3]["parent"], prev + 2)
self.assertEqual(c[prev + 1]["name"], "Team Fortress 2")
self.assertEqual(c[prev + 2]["name"], "Test tf [A-1:123]")
self.assertEqual(c[prev + 3]["name"], "Red")
sid = mumble_server.id()
@@ -251,6 +321,13 @@ class Test(unittest.TestCase):
gotcid = self.s.getOrCreateChannelFor(mumble_server, game, server, team)
self.assertEqual(cid, gotcid)
c = mumble_server.channels
self.checkACLThings(c[prev + 3]['acls'], [{'group' : '~source_tf_[A-1:123]_3'}])
self.checkACLThings(c[prev + 2]['acls'], [{'group' : '~source_tf_[A-1:123]'}])
self.checkACLThings(c[prev + 1]['acls'], [{},
{'group' : '~source_tf'}])
#print self.s.db.db.execute("SELECT * FROM source").fetchall()
def testGetGameName(self):
@@ -277,34 +354,34 @@ class Test(unittest.TestCase):
def testValidGameType(self):
self.resetState()
self.assertTrue(self.s.validGameType("dod"))
self.assertTrue(self.s.validGameType("cstrike"))
self.assertTrue(self.s.validGameType("tf"))
self.assertTrue(self.s.isValidGameType("dod"))
self.assertTrue(self.s.isValidGameType("cstrike"))
self.assertTrue(self.s.isValidGameType("tf"))
self.assertFalse(self.s.validGameType("dodx"))
self.assertFalse(self.s.validGameType("xdod"))
self.assertFalse(self.s.validGameType(""))
self.assertFalse(self.s.isValidGameType("dodx"))
self.assertFalse(self.s.isValidGameType("xdod"))
self.assertFalse(self.s.isValidGameType(""))
def testValidServer(self):
self.resetState()
self.assertTrue(self.s.validServer("dod", "[A-1:2807761920(3281)]"))
self.assertTrue(self.s.isValidServer("dod", "[A-1:2807761920(3281)]"))
self.assertFalse(self.s.validServer("dod", "A-1:2807761920(3281)]"))
self.assertFalse(self.s.validServer("dod", "[A-1:2807761920(3281)"))
self.assertFalse(self.s.validServer("dod", "[A-1:2807761920(3281)&]"))
self.assertFalse(self.s.isValidServer("dod", "A-1:2807761920(3281)]"))
self.assertFalse(self.s.isValidServer("dod", "[A-1:2807761920(3281)"))
self.assertFalse(self.s.isValidServer("dod", "[A-1:2807761920(3281)&]"))
self.assertTrue(self.s.validServer("tf", "[A-1:123]"))
self.assertTrue(self.s.isValidServer("tf", "[A-1:123]"))
self.assertFalse(self.s.validServer("tf", "x[A-1:123]"))
self.assertFalse(self.s.validServer("tf", "[A-1:123]x"))
self.assertFalse(self.s.isValidServer("tf", "x[A-1:123]"))
self.assertFalse(self.s.isValidServer("tf", "[A-1:123]x"))
def testMoveUser(self):
self.resetState()
mumble_server = self.mserv
user_state = StateMock()
prev = self.mserv.lastChannelID()
prev = self.mserv._lastChannelID()
TEAM_BLUE = 2
TEAM_RED = 3
@@ -315,29 +392,104 @@ class Test(unittest.TestCase):
TEAM_RED_SID = prev + 3
TEAM_BLUE_SID = prev + 4
self.s.moveUser(self.mserv, user_state, "tf", "[A-1:123]", TEAM_BLUE)
user = source.User(user_state, {'team':TEAM_BLUE}, "tf", "[A-1:123]")
self.s.moveUser(self.mserv, user)
c = mumble_server.channels
self.assertEqual(c[prev + 1]["parent"], BASE_SID)
self.assertEqual(c[prev + 2]["parent"], GAME_SID)
self.assertEqual(c[prev + 3]["parent"], SERVER_SID)
self.assertEqual(mumble_server.parent[0], BASE_SID)
self.assertEqual(mumble_server.parent[1], GAME_SID)
self.assertEqual(mumble_server.parent[2], SERVER_SID)
self.assertEqual(mumble_server.name[0], "Team Fortress 2")
self.assertEqual(mumble_server.name[1], "Test tf [A-1:123]")
self.assertEqual(mumble_server.name[2], "Blue")
self.assertEqual(len(mumble_server.name), 3)
self.assertEqual(c[prev + 1]["name"], "Team Fortress 2")
self.assertEqual(c[prev + 2]["name"], "Test tf [A-1:123]")
self.assertEqual(c[prev + 3]["name"], "Blue")
self.assertEqual(len(c), 3)
self.assertEqual(user_state.channel, TEAM_RED_SID)
self.assertEqual(mumble_server.user_state[0], user_state)
self.s.moveUser(self.mserv, user_state, "tf", "[A-1:123]", TEAM_RED)
user.identity['team'] = TEAM_RED
self.s.moveUser(self.mserv, user)
self.assertEqual(mumble_server.parent[3], SERVER_SID)
self.assertEqual(mumble_server.name[3], "Red")
self.assertEqual(len(mumble_server.parent), 4)
self.assertEqual(c[prev + 4]["parent"], SERVER_SID)
self.assertEqual(c[prev + 4]["name"], "Red")
self.assertEqual(len(c), 4)
self.assertEqual(user_state.channel, TEAM_BLUE_SID)
self.assertEqual(mumble_server.user_state[0], user_state)
def testSetACLsForGameChannel(self):
self.resetState()
mumble_server = self.mserv
cid = mumble_server.addChannel("test", 1); game = "dod"
self.s.setACLsForGameChannel(mumble_server, cid, game)
acls = mumble_server.channels[cid]['acls']
self.checkACLThings(acls, [{'applyHere' : True,
'applySubs' : True,
'userid' : -1,
'group' : 'all',
'deny' : ACLS.ALL,
'allow' : 0},
{'applyHere' : True,
'applySubs' : False,
'userid' : -1,
'group' : '~source_dod',
'deny' : 0,
'allow' : ACLS.EAT}])
def testSetACLsForServerChannel(self):
self.resetState()
mumble_server = self.mserv
cid = mumble_server.addChannel("test", 1); game = "tf"; server = "[A-1:SomeServer]"
self.s.setACLsForServerChannel(mumble_server, cid, game, server)
acls = mumble_server.channels[cid]['acls']
self.checkACLThings(acls, [{'applyHere' : True,
'applySubs' : False,
'userid' : -1,
'group' : '~source_tf_[A-1:SomeServer]',
'deny' : 0,
'allow' : ACLS.EAT}])
def testSetACLsForTeamChannel(self):
self.resetState()
mumble_server = self.mserv
cid = mumble_server.addChannel("test", 1); game = "tf"; server = "[A-1:SomeServer]"; team = 2
self.s.setACLsForTeamChannel(mumble_server, cid, game, server, team)
acls = mumble_server.channels[cid]['acls']
self.checkACLThings(acls, [{'applyHere' : True,
'applySubs' : False,
'userid' : -1,
'group' : '~source_tf_[A-1:SomeServer]_2',
'deny' : 0,
'allow' : ACLS.ALL}])
def testAddToGroups(self):
self.resetState()
mumble_server = self.mserv
prev = mumble_server._lastChannelID()
session = 10; game = 'cstrike'; server = '[A-1:12345]'; team = 1
self.s.getOrCreateChannelFor(mumble_server, game, server, team)
# Test
self.s.addToGroups(mumble_server, session, game, server, team)
groups = mumble_server.channels[prev + 1]['groups'][session]
self.assertIn("source_cstrike", groups)
self.assertIn("source_cstrike_[A-1:12345]", groups)
self.assertIn("source_cstrike_[A-1:12345]_1", groups)
if __name__ == "__main__":
#logging.basicConfig(level = logging.DEBUG)

View File

@@ -41,20 +41,28 @@ class User(object):
self.game = game
def valid(self):
""" True if valid data is available for all fields """
"""
True if valid data is available for all fields
"""
return self.state and self.identity and self.server and self.game
def hasContextOrIdentityChanged(self, otherstate):
""" Checks whether the given state diverges from this users's """
"""
Checks whether the given state diverges from this users's
"""
return self.state.context != otherstate.context or \
self.state.identity != otherstate.identity
def updateState(self, state):
""" Updates the state of this user """
"""
Updates the state of this user
"""
self.state = state
def updateData(self, identity, game, server):
""" Updates the data fields for this user """
"""
Updates the data fields for this user
"""
self.identity = identity
self.game = game
self.server = server
@@ -69,14 +77,18 @@ class UserRegistry(object):
self.users = {} # {session:user, ...}
def get(self, sid, session):
""" Return user or None from registry """
"""
Return user or None from registry
"""
try:
return self.users[sid][session]
except KeyError:
return None
def add(self, sid, session, user):
""" Add new user to registry """
"""
Add new user to registry
"""
assert(isinstance(user, User))
if not sid in self.users:
@@ -88,7 +100,9 @@ class UserRegistry(object):
return True
def addOrUpdate(self, sid, session, user):
""" Add user or overwrite existing one """
"""
Add user or overwrite existing one (identified by sid + session)
"""
assert(isinstance(user, User))
if not sid in self.users:
@@ -99,7 +113,9 @@ class UserRegistry(object):
return True
def remove(self, sid, session):
""" Remove user from registry """
"""
Remove user from registry
"""
try:
del self.users[sid][session]
except KeyError: