This commit is contained in:
2016-06-01 21:03:15 +02:00
parent c42113d712
commit 27d0d313d2
27 changed files with 1657 additions and 1518 deletions

View File

@@ -1 +1 @@
from source import source
from .source import source

View File

@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- coding: utf-8
# Copyright (C) 2013 Stefan Hacker <dd0t@users.sourceforge.net>
@@ -31,13 +31,14 @@
import sqlite3
#TODO: Functions returning channels probably should return a dict instead of a tuple
# TODO: Functions returning channels probably should return a dict instead of a tuple
class SourceDB(object):
NO_SERVER = ""
NO_TEAM = -1
def __init__(self, path = ":memory:"):
def __init__(self, path=":memory:"):
"""
Initialize the sqlite database in the given path. If no path
is given the database is created in memory.
@@ -54,7 +55,7 @@ class SourceDB(object):
UNIQUE(sid, cid),
PRIMARY KEY (sid, game, server, team)
)""")
self.db.execute("""
CREATE TABLE IF NOT EXISTS mapped_names (
sid INTEGER NOT NULL,
@@ -75,38 +76,39 @@ class SourceDB(object):
self.db.commit()
self.db.close()
self.db = None
def isOk(self):
"""
True if the database is correctly initialized
"""
return self.db != None
def nameFor(self, sid, game, server = NO_SERVER, team = NO_TEAM, default = ""):
return self.db is not None
def nameFor(self, sid, game, server=NO_SERVER, team=NO_TEAM, default=""):
"""
Returns the mapped name for the given parameters or default if no
mapping exists.
"""
assert(sid != None and game != None)
assert(not (team != self.NO_TEAM and server == self.NO_SERVER))
v = self.db.execute("SELECT name FROM mapped_names WHERE sid is ? and game is ? and server is ? and team is ?", [sid, game, server, team]).fetchone()
assert (sid is not None and game is not None)
assert (not (team != self.NO_TEAM and server == self.NO_SERVER))
v = self.db.execute("SELECT name FROM mapped_names WHERE sid is ? and game is ? and server is ? and team is ?",
[sid, game, server, team]).fetchone()
return v[0] if v else default
def mapName(self, name, sid, game, server = NO_SERVER, team = NO_TEAM):
def mapName(self, name, sid, game, server=NO_SERVER, team=NO_TEAM):
"""
Stores a mapping for the given (sid, game, server, team) combination
to the given name. The mapping can then be retrieved with nameFor() in
the future.
"""
assert(sid != None and game != None)
assert(not (team != self.NO_TEAM and server == self.NO_SERVER))
self.db.execute("INSERT OR REPLACE into mapped_names (sid, game, server, team, name) VALUES (?,?,?,?,?)",[sid, game, server, team, name])
assert (sid is not None and game is not None)
assert (not (team != self.NO_TEAM and server == self.NO_SERVER))
self.db.execute("INSERT OR REPLACE into mapped_names (sid, game, server, team, name) VALUES (?,?,?,?,?)",
[sid, game, server, team, name])
self.db.commit()
def cidFor(self, sid, game, server = NO_SERVER, team = NO_TEAM):
def cidFor(self, sid, game, server=NO_SERVER, team=NO_TEAM):
"""
Returns the channel id for game specific channel. If only game
is passed the game root channel cid is returned. If additionally
@@ -115,10 +117,12 @@ class SourceDB(object):
If no channel matching the arguments has been registered with the database
before None is returned.
"""
assert(sid != None and game != None)
assert(not (team != self.NO_TEAM and server == self.NO_SERVER))
v = self.db.execute("SELECT cid FROM controlled_channels WHERE sid is ? and game is ? and server is ? and team is ?", [sid, game, server, team]).fetchone()
assert (sid is not None and game is not None)
assert (not (team != self.NO_TEAM and server == self.NO_SERVER))
v = self.db.execute(
"SELECT cid FROM controlled_channels WHERE sid is ? and game is ? and server is ? and team is ?",
[sid, game, server, team]).fetchone()
return v[0] if v else None
def channelForCid(self, sid, cid):
@@ -126,44 +130,51 @@ class SourceDB(object):
Returns a tuple of (sid, cid, game, server, team) for the given cid.
Returns None if the cid is unknown.
"""
assert(sid != None and cid != None)
return self.db.execute("SELECT sid, cid, game, server, team FROM controlled_channels WHERE sid is ? and cid is ?", [sid, cid]).fetchone()
def channelFor(self, sid, game, server = NO_SERVER, team = NO_TEAM):
assert (sid is not None and cid is not None)
return self.db.execute(
"SELECT sid, cid, game, server, team FROM controlled_channels WHERE sid is ? and cid is ?",
[sid, cid]).fetchone()
def channelFor(self, sid, game, server=NO_SERVER, team=NO_TEAM):
"""
Returns matching channel as (sid, cid, game, server, team) tuple. Matching
behavior is the same as for cidFor()
"""
assert(sid != None and game != None)
assert(not (team != self.NO_TEAM and server == self.NO_SERVER))
v = self.db.execute("SELECT sid, cid, game, server, team FROM controlled_channels WHERE sid is ? and game is ? and server is ? and team is ?", [sid, game, server, team]).fetchone()
assert (sid is not None and game is not None)
assert (not (team != self.NO_TEAM and server == self.NO_SERVER))
v = self.db.execute(
"SELECT sid, cid, game, server, team FROM controlled_channels WHERE sid is ? and game is ? and server is ? and team is ?",
[sid, game, server, team]).fetchone()
return v
def channelsFor(self, sid, game, server = NO_SERVER, team = NO_TEAM):
def channelsFor(self, sid, game, server=NO_SERVER, team=NO_TEAM):
"""
Returns matching channels as a list of (sid, cid, game, server, team) tuples.
If only the game is passed all server and team channels are matched.
This can be limited by passing server (and team).
Returns empty list if no matches are found.
"""
assert(sid != None and game != None)
assert(not (team != self.NO_TEAM and server == self.NO_SERVER))
assert (sid is not None and game is not None)
assert (not (team != self.NO_TEAM and server == self.NO_SERVER))
suffix, params = self.__whereClauseForOptionals(server, team)
return self.db.execute("SELECT sid, cid, game, server, team FROM controlled_channels WHERE sid is ? and game is ?" + suffix, [sid, game] + params).fetchall()
def registerChannel(self, sid, cid, game, server = NO_SERVER, team = NO_TEAM):
return self.db.execute(
"SELECT sid, cid, game, server, team FROM controlled_channels WHERE sid is ? and game is ?" + suffix,
[sid, game] + params).fetchall()
def registerChannel(self, sid, cid, game, server=NO_SERVER, team=NO_TEAM):
"""
Register a given channel with the database.
"""
assert(sid != None and game != None)
assert(not (team != self.NO_TEAM and server == self.NO_SERVER))
self.db.execute("INSERT INTO controlled_channels (sid, cid, game, server, team) VALUES (?,?,?,?,?)", [sid, cid, game, server, team])
assert (sid is not None and game is not None)
assert (not (team != self.NO_TEAM and server == self.NO_SERVER))
self.db.execute("INSERT INTO controlled_channels (sid, cid, game, server, team) VALUES (?,?,?,?,?)",
[sid, cid, game, server, team])
self.db.commit()
return True
def __whereClauseForOptionals(self, server, team):
"""
Generates where class conditions that interpret missing server
@@ -171,49 +182,49 @@ class SourceDB(object):
Returns (suffix, additional parameters) tuple
"""
if server != self.NO_SERVER and team != self.NO_TEAM:
return (" and server is ? and team is ?", [server, team])
return " and server is ? and team is ?", [server, team]
elif server != self.NO_SERVER:
return (" and server is ?", [server])
return " and server is ?", [server]
else:
return ("", [])
def unregisterChannel(self, sid, game, server = NO_SERVER, team = NO_TEAM):
return "", []
def unregisterChannel(self, sid, game, server=NO_SERVER, team=NO_TEAM):
"""
Unregister a channel previously registered with the database.
"""
assert(sid != None and game != None)
assert(not (team != self.NO_TEAM and server == self.NO_SERVER))
assert (sid is not None and game is not None)
assert (not (team != self.NO_TEAM and server == self.NO_SERVER))
suffix, params = self.__whereClauseForOptionals(server, team)
self.db.execute("DELETE FROM controlled_channels WHERE sid is ? and game is ?" + suffix, [sid, game] + params)
self.db.commit()
def dropChannel(self, sid, cid):
"""
Drops channel with given sid + cid
"""
assert(sid != None and cid != None)
assert (sid is not None and cid is not None)
self.db.execute("DELETE FROM controlled_channels WHERE sid is ? and cid is ?", [sid, cid])
self.db.commit()
def isRegisteredChannel(self, sid, cid):
"""
Returns true if a channel with given sid and cid is registered
"""
assert(sid != None and cid != None)
assert (sid is not None and cid is not None)
res = self.db.execute("SELECT cid FROM controlled_channels WHERE sid is ? and cid is ?", [sid, cid]).fetchone()
return res != None
return res is not None
def registeredChannels(self):
"""
Returns channels as a list of (sid, cid, game, server team) tuples grouped by sid
"""
return self.db.execute("SELECT sid, cid, game, server, team FROM controlled_channels ORDER by sid").fetchall()
def reset(self):
"""
Deletes everything in the database
@@ -221,6 +232,7 @@ class SourceDB(object):
self.db.execute("DELETE FROM mapped_names")
self.db.execute("DELETE FROM controlled_channels")
self.db.commit()
if __name__ == "__main__":
pass

View File

@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- coding: utf-8
# Copyright (C) 2013 Stefan Hacker <dd0t@users.sourceforge.net>
@@ -29,9 +29,11 @@
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import unittest
from db import SourceDB
import sqlite3
import unittest
from .db import SourceDB
class SourceDBTest(unittest.TestCase):
def setUp(self):
@@ -40,227 +42,261 @@ class SourceDBTest(unittest.TestCase):
def tearDown(self):
self.db.close()
def testOk(self):
self.db.reset()
self.assertTrue(self.db.isOk())
def testSingleChannel(self):
self.db.reset()
sid = 5; cid = 10; game = "tf2"; server = "abc[]def"; team = "1"
sid = 5
cid = 10
game = "tf2"
server = "abc[]def"
team = "1"
self.assertTrue(self.db.registerChannel(sid, cid, game, server, team))
self.assertEqual(self.db.cidFor(sid, game, server, team), cid)
self.db.unregisterChannel(sid, game, server, team)
self.assertEqual(self.db.cidFor(sid, game, server, team), None)
def testChannelTree(self):
self.db.reset()
sid = 5; game = "tf2"; server = "abc[]def"; team = 0
bcid = 10; scid = 11; tcid = 12
sid = 5
game = "tf2"
server = "abc[]def"
team = 0
bcid = 10
scid = 11
tcid = 12
self.assertTrue(self.db.registerChannel(sid, 1, "canary", server, team))
# Delete whole tree
self.assertTrue(self.db.registerChannel(sid, bcid, game))
self.assertTrue(self.db.registerChannel(sid, scid, game, server))
self.assertTrue(self.db.registerChannel(sid, tcid, game, server, team))
self.assertEqual(self.db.cidFor(sid, game), bcid)
self.assertEqual(self.db.cidFor(sid, game, server), scid)
self.assertEqual(self.db.cidFor(sid, game, server, team), tcid)
self.assertEqual(self.db.cidFor(sid+1, game, server, team), None)
self.assertEqual(self.db.cidFor(sid + 1, game, server, team), None)
self.db.unregisterChannel(sid, game)
self.assertEqual(self.db.cidFor(sid, game, server, team), None)
self.assertEqual(self.db.cidFor(sid, game, server), None)
self.assertEqual(self.db.cidFor(sid, game), None)
# Delete server channel
self.assertTrue(self.db.registerChannel(sid, bcid, game))
self.assertTrue(self.db.registerChannel(sid, scid, game, server))
self.assertTrue(self.db.registerChannel(sid, tcid, game, server, team))
self.db.unregisterChannel(sid, game, server)
self.assertEqual(self.db.cidFor(sid, game), bcid)
self.assertEqual(self.db.cidFor(sid, game, server), None)
self.assertEqual(self.db.cidFor(sid, game, server, team), None)
self.db.unregisterChannel(sid, game)
# Delete team channel
self.assertTrue(self.db.registerChannel(sid, bcid, game))
self.assertTrue(self.db.registerChannel(sid, scid, game, server))
self.assertTrue(self.db.registerChannel(sid, tcid, game, server, team))
self.db.unregisterChannel(sid, game, server, team)
self.assertEqual(self.db.cidFor(sid, game), bcid)
self.assertEqual(self.db.cidFor(sid, game, server), scid)
self.assertEqual(self.db.cidFor(sid, game, server, team), None)
self.db.unregisterChannel(sid, game)
# Check canary
self.assertEqual(self.db.cidFor(sid, "canary", server, team), 1)
self.db.unregisterChannel(sid, "canary", server, team)
def testDropChannel(self):
self.db.reset()
sid = 1; cid = 5; game = "tf"
sid = 1
cid = 5
game = "tf"
self.db.registerChannel(sid, cid, game)
self.db.dropChannel(sid + 1, cid)
self.assertEqual(self.db.cidFor(sid, game), cid)
self.db.dropChannel(sid, cid)
self.assertEqual(self.db.cidFor(sid, game), None)
def testRegisteredChannels(self):
self.db.reset()
sid = 5; game = "tf2"; server = "abc[]def"; team = 1
bcid = 10; scid = 11; tcid = 12;
sid = 5
game = "tf2"
server = "abc[]def"
team = 1
bcid = 10
scid = 11
tcid = 12
self.db.registerChannel(sid, bcid, game)
self.db.registerChannel(sid, scid, game, server)
self.db.registerChannel(sid+1, tcid, game, server, team)
self.db.registerChannel(sid + 1, tcid, game, server, team)
self.db.registerChannel(sid, tcid, game, server, team)
expected = [(sid, bcid, game, self.db.NO_SERVER, self.db.NO_TEAM),
(sid, scid, game, server, self.db.NO_TEAM),
(sid, tcid, game, server, team),
(sid+1, tcid, game, server, team)]
(sid + 1, tcid, game, server, team)]
self.assertEqual(self.db.registeredChannels(), expected)
def testIsRegisteredChannel(self):
self.db.reset()
sid = 1; cid = 0; game = "tf"
sid = 1
cid = 0
game = "tf"
self.db.registerChannel(sid, cid, game)
self.assertTrue(self.db.isRegisteredChannel(sid, cid))
self.assertFalse(self.db.isRegisteredChannel(sid+1, cid))
self.assertFalse(self.db.isRegisteredChannel(sid, cid+1))
self.assertFalse(self.db.isRegisteredChannel(sid + 1, cid))
self.assertFalse(self.db.isRegisteredChannel(sid, cid + 1))
self.db.unregisterChannel(sid, game)
self.assertFalse(self.db.isRegisteredChannel(sid, cid))
def testChannelFor(self):
self.db.reset()
sid = 1; cid = 0; game = "tf"; server = "serv"; team = 0
sid = 1
cid = 0
game = "tf"
server = "serv"
team = 0
self.db.registerChannel(sid, cid, game)
self.db.registerChannel(sid, cid+1, game, server)
self.db.registerChannel(sid, cid+2, game, server, team)
self.db.registerChannel(sid, cid + 1, game, server)
self.db.registerChannel(sid, cid + 2, game, server, team)
res = self.db.channelFor(sid, game, server, team)
self.assertEqual(res, (sid, cid + 2, game, server, team))
res = self.db.channelFor(sid, game, server)
self.assertEqual(res, (sid, cid + 1, game, server, self.db.NO_TEAM))
res = self.db.channelFor(sid, game)
self.assertEqual(res, (sid, cid, game, self.db.NO_SERVER, self.db.NO_TEAM))
res = self.db.channelFor(sid, game, server, team+5)
res = self.db.channelFor(sid, game, server, team + 5)
self.assertEqual(res, None)
def testChannelForCid(self):
self.db.reset()
sid = 1; cid = 0; game = "tf"; server = "serv"; team = 0
sid = 1
cid = 0
game = "tf"
server = "serv"
team = 0
self.db.registerChannel(sid, cid, game)
self.db.registerChannel(sid, cid+1, game, server)
self.db.registerChannel(sid, cid+2, game, server, team)
self.db.registerChannel(sid, cid + 1, game, server)
self.db.registerChannel(sid, cid + 2, game, server, team)
res = self.db.channelForCid(sid, cid)
self.assertEqual(res, (sid, cid, game, self.db.NO_SERVER, self.db.NO_TEAM))
res = self.db.channelForCid(sid, cid + 1)
self.assertEqual(res, (sid, cid + 1, game, server, self.db.NO_TEAM))
res = self.db.channelForCid(sid, cid + 2)
self.assertEqual(res, (sid, cid + 2, game, server, team))
res = self.db.channelForCid(sid, cid + 3)
self.assertEqual(res, None)
def testChannelsFor(self):
self.db.reset()
sid = 1; cid = 0; game = "tf"; server = "serv"; team = 0
sid = 1
cid = 0
game = "tf"
server = "serv"
team = 0
self.db.registerChannel(sid, cid, game)
self.db.registerChannel(sid, cid+1, game, server)
self.db.registerChannel(sid, cid+2, game, server, team)
chans = ((sid, cid+2, game, server, team),
(sid, cid+1, game, server, self.db.NO_TEAM),
self.db.registerChannel(sid, cid + 1, game, server)
self.db.registerChannel(sid, cid + 2, game, server, team)
chans = ((sid, cid + 2, game, server, team),
(sid, cid + 1, game, server, self.db.NO_TEAM),
(sid, cid, game, self.db.NO_SERVER, self.db.NO_TEAM))
res = self.db.channelsFor(sid, game, server, team)
self.assertItemsEqual(res, chans[0:1])
self.assertCountEqual(res, chans[0:1])
res = self.db.channelsFor(sid, game, server)
self.assertItemsEqual(res, chans[0:2])
self.assertCountEqual(res, chans[0:2])
res = self.db.channelsFor(sid, game)
self.assertItemsEqual(res, chans)
res = self.db.channelsFor(sid+1, game)
self.assertItemsEqual(res, [])
self.assertCountEqual(res, chans)
res = self.db.channelsFor(sid + 1, game)
self.assertCountEqual(res, [])
def testChannelTableConstraints(self):
self.db.reset()
# cid constraint
sid = 1; cid = 0; game = "tf"; server = "serv"; team = 0
sid = 1
cid = 0
game = "tf"
server = "serv"
team = 0
self.db.registerChannel(sid, cid, game)
self.assertRaises(sqlite3.IntegrityError, self.db.registerChannel, sid, cid, "cstrike")
# combination constraint
self.assertRaises(sqlite3.IntegrityError, self.db.registerChannel, sid, cid+1000, game)
self.db.registerChannel(sid, cid+1, game, server)
self.assertRaises(sqlite3.IntegrityError, self.db.registerChannel, sid, cid+100, game, server)
self.db.registerChannel(sid, cid+2, game, server, team)
self.assertRaises(sqlite3.IntegrityError, self.db.registerChannel, sid, cid+200, game, server, team)
self.assertRaises(sqlite3.IntegrityError, self.db.registerChannel, sid, cid + 1000, game)
self.db.registerChannel(sid, cid + 1, game, server)
self.assertRaises(sqlite3.IntegrityError, self.db.registerChannel, sid, cid + 100, game, server)
self.db.registerChannel(sid, cid + 2, game, server, team)
self.assertRaises(sqlite3.IntegrityError, self.db.registerChannel, sid, cid + 200, game, server, team)
def testChannelNameMappingTableConstraints(self):
self.db.reset()
sid = 1; game = "tf"
sid = 1
game = "tf"
# mapName performs an INSERT OR REPLACE which relies on the UNIQUE constraint
self.db.mapName("SomeTestName", sid, game)
self.db.mapName("SomeOtherName", sid, game)
self.assertEqual(self.db.nameFor(sid, game), "SomeOtherName")
def testNameMapping(self):
self.db.reset()
sid = 1; game = "tf"; server = "[12313]";team = 2
self.assertEqual(self.db.nameFor(sid, game, default = "test"), "test")
sid = 1
game = "tf"
server = "[12313]"
team = 2
self.assertEqual(self.db.nameFor(sid, game, default="test"), "test")
self.db.mapName("Game", sid, game)
self.db.mapName("Game Server", sid, game, server)
self.db.mapName("Game Server Team", sid, game, server, team)
self.db.mapName("Game Server Team 2", sid + 1, game, server, team)
self.db.mapName("Game Server Team 2", sid, "cstrike", server, team)
self.assertEqual(self.db.nameFor(sid, game), "Game")
self.assertEqual(self.db.nameFor(sid, game, server), "Game Server")
self.assertEqual(self.db.nameFor(sid, game, server, team), "Game Server Team")
if __name__ == "__main__":
#import sys;sys.argv = ['', 'Test.testName']
unittest.main()
# import sys;sys.argv = ['', 'Test.testName']
unittest.main()

View File

@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- coding: utf-8
# Copyright (C) 2013 Stefan Hacker <dd0t@users.sourceforge.net>
@@ -35,16 +35,15 @@
# gamestate reported by Mumble positional audio plugins
#
from mumo_module import (MumoModule,
commaSeperatedIntegers,
commaSeperatedStrings,
x2bool)
from db import SourceDB
from users import (User, UserRegistry)
import re
from config import commaSeperatedStrings, x2bool, commaSeperatedIntegers
from mumo_module import MumoModule
from .db import SourceDB
from .users import (User, UserRegistry)
# noinspection PyPep8Naming
class source(MumoModule):
"""
This class combines the basic mumble moderator callbacks with
@@ -52,30 +51,30 @@ class source(MumoModule):
context and identity information.
"""
default_game_config = (
('name', str, "%(game)s"),
('servername', str, "%(server)s"),
('teams', commaSeperatedStrings, ["Lobby", "Spectator", "Team one", "Team two", "Team three", "Team four"]),
('restrict', x2bool, True),
('serverregex', re.compile, re.compile("^\[[\w\d\-\(\):]{1,20}\]$")),
('deleteifunused', x2bool, True)
)
default_config = {'source':(
('database', str, "source.sqlite"),
('basechannelid', int, 0),
('mumbleservers', commaSeperatedIntegers, []),
('gameregex', re.compile, re.compile("^(tf|dod|cstrike|hl2mp)$")),
('groupprefix', str, "source_")
),
# The generic section defines default values which can be overridden in
# optional game specific "game:<gameshorthand>" sections
('name', str, "%(game)s"),
('servername', str, "%(server)s"),
('teams', commaSeperatedStrings, ["Lobby", "Spectator", "Team one", "Team two", "Team three", "Team four"]),
('restrict', x2bool, True),
('serverregex', re.compile, re.compile("^\[[\w\d\-\(\):]{1,20}\]$")),
('deleteifunused', x2bool, True)
)
default_config = {'source': (
('database', str, "source.sqlite"),
('basechannelid', int, 0),
('mumbleservers', commaSeperatedIntegers, []),
('gameregex', re.compile, re.compile("^(tf|dod|cstrike|hl2mp)$")),
('groupprefix', str, "source_")
),
# The generic section defines default values which can be overridden in
# optional game specific "game:<gameshorthand>" sections
'generic': default_game_config,
lambda x: re.match('^game:\w+$', x): default_game_config
}
'generic': default_game_config,
lambda x: re.match('^game:\w+$', x): default_game_config
}
def __init__(self, name, manager, configuration=None):
MumoModule.__init__(self, name, manager, configuration)
self.murmur = manager.getMurmurModule()
@@ -84,11 +83,11 @@ class source(MumoModule):
MumoModule.onStart(self)
cfg = self.cfg()
self.db = SourceDB(cfg.source.database)
def onStop(self):
MumoModule.onStop(self)
self.db.close()
def connected(self):
"""
Makes sure the the plugin is correctly configured once the connection
@@ -98,21 +97,20 @@ class source(MumoModule):
manager = self.manager()
log = self.log()
log.debug("Register for Server callbacks")
self.meta = manager.getMeta()
servers = set(cfg.source.mumbleservers)
if not servers:
servers = manager.SERVERS_ALL
self.users = UserRegistry()
self.validateChannelDB()
manager.subscribeServerCallbacks(self, servers)
manager.subscribeMetaCallbacks(self, servers)
def validateChannelDB(self):
"""
Makes sure the plugins internal datatbase
@@ -120,32 +118,33 @@ class source(MumoModule):
"""
log = self.log()
log.debug("Validating channel database")
current_sid = -1
current_mumble_server = None
for sid, cid, game, server, team in self.db.registeredChannels():
if current_sid != sid:
current_mumble_server = self.meta.getServer(sid)
current_sid = sid
try:
state = current_mumble_server.getChannelState(cid)
self.db.mapName(state.name, sid, game, server, team)
#TODO: Verify ACL?
# TODO: Verify ACL?
except self.murmur.InvalidChannelException:
# Channel no longer exists
log.debug("(%d) Channel %d no longer exists. Dropped.", sid, cid)
self.db.dropChannel(sid, cid)
except AttributeError:
# Server no longer exists
assert(current_mumble_server == None)
assert (current_mumble_server is None)
log.debug("(%d) Server for channel %d no longer exists. Dropped.", sid, cid)
self.db.dropChannel(sid, cid)
def disconnected(self): pass
def disconnected(self):
pass
def removeFromGroups(self, mumble_server, session, game, server, team):
"""
Removes the client from all relevant groups
@@ -153,16 +152,16 @@ class source(MumoModule):
sid = mumble_server.id()
prefix = self.cfg().source.groupprefix
game_cid = self.db.cidFor(sid, game)
group = prefix + game
mumble_server.removeUserFromGroup(game_cid, session, group) # Game
mumble_server.removeUserFromGroup(game_cid, session, group) # Game
group += "_" + server
mumble_server.removeUserFromGroup(game_cid, session, group) # Server
mumble_server.removeUserFromGroup(game_cid, session, group) # Server
group += "_" + str(team)
mumble_server.removeUserFromGroup(game_cid, session, group) # Team
mumble_server.removeUserFromGroup(game_cid, session, group) # Team
def addToGroups(self, mumble_server, session, game, server, team):
"""
Adds the client to all relevant groups
@@ -170,49 +169,48 @@ class source(MumoModule):
sid = mumble_server.id()
prefix = self.cfg().source.groupprefix
game_cid = self.db.cidFor(sid, game)
assert(game_cid != None)
assert (game_cid is not None)
group = prefix + game
mumble_server.addUserToGroup(game_cid, session, group) # Game
mumble_server.addUserToGroup(game_cid, session, group) # Game
group += "_" + server
mumble_server.addUserToGroup(game_cid, session, group) # Server
mumble_server.addUserToGroup(game_cid, session, group) # Server
group += "_" + str(team)
mumble_server.addUserToGroup(game_cid, session, group) # Team
mumble_server.addUserToGroup(game_cid, session, group) # Team
def transitionPresentUser(self, mumble_server, old, new, sid, user_new):
"""
Transitions a user that has been and is currently playing
"""
assert(new)
assert new
target_cid = self.getOrCreateTargetChannelFor(mumble_server, new)
if user_new:
self.dlog(sid, new.state, "User started playing: g/s/t %s/%s/%d", new.game, new.server, new.identity["team"])
self.dlog(sid, new.state, "User started playing: g/s/t %s/%s/%d", new.game, new.server,
new.identity["team"])
self.addToGroups(mumble_server, new.state.session, new.game, new.server, new.identity["team"])
else:
assert old
self.dlog(sid, old.state, "User switched: g/s/t %s/%s/%d", new.game, new.server, new.identity["team"])
self.removeFromGroups(mumble_server, old.state.session, old.game, old.server, old.identity["team"])
self.addToGroups(mumble_server, new.state.session, new.game, new.server, new.identity["team"])
return self.moveUser(mumble_server, new, target_cid)
return self.moveUser(mumble_server, new, target_cid)
def transitionGoneUser(self, mumble_server, old, new, sid):
"""
Transitions a user that played but is no longer doing so now.
"""
assert(old)
assert old
self.users.remove(sid, old.state.session)
if new:
self.removeFromGroups(mumble_server, old.state.session, old.game, old.server, old.identity["team"])
bcid = self.cfg().source.basechannelid
self.dlog(sid, old.state, "User stopped playing. Moving to %d.", bcid)
self.moveUserToCid(mumble_server, new.state, bcid)
@@ -220,7 +218,6 @@ class source(MumoModule):
self.dlog(sid, old.state, "User gone")
return True
def userLeftChannel(self, mumble_server, old, sid):
"""
User left channel. Make sure we check for vacancy it if the game it
@@ -247,37 +244,36 @@ class source(MumoModule):
"""
sid = mumble_server.id()
assert(not old or old.valid())
relevant = old or (new and new.valid())
assert (not old or old.valid())
relevant = old or (new and new.valid())
if not relevant:
return
user_new = not old and new and new.valid()
user_gone = old and (not new or not new.valid())
if not user_gone:
moved = self.transitionPresentUser(mumble_server, old, new, sid, user_new)
else:
moved = self.transitionGoneUser(mumble_server, old, new, sid)
if moved and old:
self.userLeftChannel(mumble_server, old, sid)
def getGameName(self, game):
"""
Returns the unexpanded game specific game name template.
"""
return self.getGameConfig(game, "name")
def getServerName(self, game):
"""
Returns the unexpanded game specific server name template.
"""
return self.getGameConfig(game, "servername")
def getTeamName(self, game, index):
"""
Returns the game specific team name for the given team index.
@@ -287,32 +283,31 @@ class source(MumoModule):
return self.getGameConfig(game, "teams")[index]
except IndexError:
return str(index)
def setACLsForGameChannel(self, mumble_server, game_cid, game):
"""
Sets the appropriate ACLs for a game channel for the given cid.
"""
# Shorthands
ACL = self.murmur.ACL
EAT = self.murmur.PermissionEnter | self.murmur.PermissionTraverse # Enter And Traverse
W = self.murmur.PermissionWhisper # Whisper
S = self.murmur.PermissionSpeak # Speak
EAT = self.murmur.PermissionEnter | self.murmur.PermissionTraverse # Enter And Traverse
W = self.murmur.PermissionWhisper # Whisper
S = self.murmur.PermissionSpeak # Speak
groupname = '~' + self.cfg().source.groupprefix + game
mumble_server.setACL(game_cid,
[ACL(applyHere = True, # Deny everything
applySubs = True,
userid = -1,
group = 'all',
deny = EAT | W | S),
ACL(applyHere = True, # Allow enter and traverse to players
applySubs = False,
userid = -1,
group = groupname,
allow = EAT)],
[], True)
[ACL(applyHere=True, # Deny everything
applySubs=True,
userid=-1,
group='all',
deny=EAT | W | S),
ACL(applyHere=True, # Allow enter and traverse to players
applySubs=False,
userid=-1,
group=groupname,
allow=EAT)],
[], True)
def setACLsForServerChannel(self, mumble_server, server_cid, game, server):
"""
@@ -320,20 +315,19 @@ class source(MumoModule):
"""
# Shorthands
ACL = self.murmur.ACL
EAT = self.murmur.PermissionEnter | self.murmur.PermissionTraverse # Enter And Traverse
W = self.murmur.PermissionWhisper # Whisper
S = self.murmur.PermissionSpeak # Speak
EAT = self.murmur.PermissionEnter | self.murmur.PermissionTraverse # Enter And Traverse
W = self.murmur.PermissionWhisper # Whisper
S = self.murmur.PermissionSpeak # Speak
groupname = '~' + self.cfg().source.groupprefix + game + "_" + server
mumble_server.setACL(server_cid,
[ACL(applyHere = True, # Allow enter and traverse to players
applySubs = False,
userid = -1,
group = groupname,
allow = EAT)],
[], True)
[ACL(applyHere=True, # Allow enter and traverse to players
applySubs=False,
userid=-1,
group=groupname,
allow=EAT)],
[], True)
def setACLsForTeamChannel(self, mumble_server, team_cid, game, server, team):
"""
@@ -341,19 +335,19 @@ class source(MumoModule):
"""
# Shorthands
ACL = self.murmur.ACL
EAT = self.murmur.PermissionEnter | self.murmur.PermissionTraverse # Enter And Traverse
W = self.murmur.PermissionWhisper # Whisper
S = self.murmur.PermissionSpeak # Speak
EAT = self.murmur.PermissionEnter | self.murmur.PermissionTraverse # Enter And Traverse
W = self.murmur.PermissionWhisper # Whisper
S = self.murmur.PermissionSpeak # Speak
groupname = '~' + self.cfg().source.groupprefix + game + "_" + server + "_" + str(team)
mumble_server.setACL(team_cid,
[ACL(applyHere = True, # Allow enter and traverse to players
applySubs = False,
userid = -1,
group = groupname,
allow = EAT | W | S)],
[], True)
[ACL(applyHere=True, # Allow enter and traverse to players
applySubs=False,
userid=-1,
group=groupname,
allow=EAT | W | S)],
[], True)
def getOrCreateGameChannelFor(self, mumble_server, game, server, sid, cfg, log, namevars):
"""
@@ -362,23 +356,22 @@ class source(MumoModule):
"""
sid = mumble_server.id()
game_cid = self.db.cidFor(sid, game)
if game_cid == None:
if game_cid is None:
game_channel_name = self.db.nameFor(sid, game,
default = (self.getGameName(game) % namevars))
default=(self.getGameName(game) % namevars))
log.debug("(%d) Creating game channel '%s' below %d", sid, game_channel_name, cfg.source.basechannelid)
game_cid = mumble_server.addChannel(game_channel_name, cfg.source.basechannelid)
self.db.registerChannel(sid, game_cid, game) # Make sure we don't have orphaned server channels around
self.db.registerChannel(sid, game_cid, game) # Make sure we don't have orphaned server channels around
self.db.unregisterChannel(sid, game, server)
if self.getGameConfig(game, "restrict"):
log.debug("(%d) Setting ACL's for new game channel (cid %d)", sid, game_cid)
self.setACLsForGameChannel(mumble_server, game_cid, game)
log.debug("(%d) Game channel created and registered (cid %d)", sid, game_cid)
return game_cid
def getOrCreateServerChannelFor(self, mumble_server, game, server, team, sid, log, namevars, game_cid):
"""
Helper function for getting or creating only the server channel. The game
@@ -386,43 +379,42 @@ class source(MumoModule):
server channel.
"""
server_cid = self.db.cidFor(sid, game, server)
if server_cid == None:
if server_cid is None:
server_channel_name = self.db.nameFor(sid, game, server,
default = self.getServerName(game) % namevars)
default=self.getServerName(game) % namevars)
log.debug("(%d) Creating server channel '%s' below %d", sid, server_channel_name, game_cid)
server_cid = mumble_server.addChannel(server_channel_name, game_cid)
self.db.registerChannel(sid, server_cid, game, server)
self.db.unregisterChannel(sid, game, server, team) # Make sure we don't have orphaned team channels around
self.db.unregisterChannel(sid, game, server, team) # Make sure we don't have orphaned team channels around
if self.getGameConfig(game, "restrict"):
log.debug("(%d) Setting ACL's for new server channel (cid %d)", sid, server_cid)
self.setACLsForServerChannel(mumble_server, server_cid, game, server)
log.debug("(%d) Server channel created and registered (cid %d)", sid, server_cid)
return server_cid
def getOrCreateTeamChannelFor(self, mumble_server, game, server, team, sid, log, server_cid):
"""
Helper function for getting or creating only the team channel. Game and
server channel must already exist. Returns the cid of the existing or
created team channel.
"""
team_cid = self.db.cidFor(sid, game, server, team)
if team_cid == None:
if team_cid is None:
team_channel_name = self.db.nameFor(sid, game, server, team,
default = self.getTeamName(game, team))
default=self.getTeamName(game, team))
log.debug("(%d) Creating team channel '%s' below %d", sid, team_channel_name, server_cid)
team_cid = mumble_server.addChannel(team_channel_name, server_cid)
self.db.registerChannel(sid, team_cid, game, server, team)
if self.getGameConfig(game, "restrict"):
log.debug("(%d) Setting ACL's for new team channel (cid %d)", sid, team_cid)
self.setACLsForTeamChannel(mumble_server, team_cid, game, server, team)
log.debug("(%d) Team channel created and registered (cid %d)", sid, team_cid)
return team_cid
@@ -435,16 +427,16 @@ class source(MumoModule):
sid = mumble_server.id()
cfg = self.cfg()
log = self.log()
namevars = {'game' : game,
'server' : server}
namevars = {'game': game,
'server': server}
game_cid = self.getOrCreateGameChannelFor(mumble_server, game, server, sid, cfg, log, namevars)
server_cid = self.getOrCreateServerChannelFor(mumble_server, game, server, team, sid, log, namevars, game_cid)
team_cid = self.getOrCreateTeamChannelFor(mumble_server, game, server, team, sid, log, server_cid)
return team_cid
def moveUserToCid(self, server, state, cid):
"""
Low level helper for moving a user to a channel known by its ID
@@ -452,7 +444,7 @@ class source(MumoModule):
self.dlog(server.id(), state, "Moving from channel %d to %d", state.channel, cid)
state.channel = cid
server.setState(state)
def getOrCreateTargetChannelFor(self, mumble_server, user):
"""
Returns the cid of the target channel for this user. If needed
@@ -462,8 +454,8 @@ class source(MumoModule):
user.game,
user.server,
user.identity["team"])
def moveUser(self, mumble_server, user, target_cid = None):
def moveUser(self, mumble_server, user, target_cid=None):
"""
Move user according to current game state.
@@ -477,21 +469,21 @@ class source(MumoModule):
server = user.server
team = user.identity["team"]
sid = mumble_server.id()
source_cid = state.channel
if target_cid == None:
if target_cid is None:
target_cid = self.getOrCreateChannelFor(mumble_server, game, server, team)
if source_cid != target_cid:
self.moveUserToCid(mumble_server, state, target_cid)
user.state.channel = target_cid
self.users.addOrUpdate(sid, state.session, user)
return True
return False
def deleteIfUnused(self, mumble_server, cid):
"""
Takes the cid of a server or team channel and checks if all
@@ -500,46 +492,46 @@ class source(MumoModule):
Note: Assumes tree structure
"""
sid = mumble_server.id()
log = self.log()
result = self.db.channelForCid(sid, cid)
if not result:
return False
_, _, cur_game, cur_server, cur_team = result
assert(cur_game)
assert cur_game
if not cur_server:
# Don't handle game channels
log.debug("(%d) Delete if unused on game channel %d, ignoring", sid, cid)
return False
server_channel_cid = None
relevant = self.db.channelsFor(sid, cur_game, cur_server)
for _, cur_cid, _, _, cur_team in relevant:
if cur_team == self.db.NO_TEAM:
server_channel_cid = cur_cid
if self.users.usingChannel(sid, cur_cid):
log.debug("(%d) Delete if unused: Channel %d in use", sid, cur_cid)
return False # Used
assert(server_channel_cid != None)
return False # Used
assert (server_channel_cid is not None)
# Unused. Delete server and children
log.debug("(%s) Channel %d unused. Will be deleted.", sid, server_channel_cid)
mumble_server.removeChannel(server_channel_cid)
return True
def isValidGameType(self, game):
return self.cfg().source.gameregex.match(game) != None
return self.cfg().source.gameregex.match(game) is not None
def isValidServer(self, game, server):
return self.getGameConfig(game, "serverregex").match(server) != None
return self.getGameConfig(game, "serverregex").match(server) is not None
def parseSourceContext(self, context):
"""
Parse source engine context string. Returns tuple with
@@ -549,19 +541,19 @@ class source(MumoModule):
try:
prefix, server = context.split('\x00')[0:2]
source, game = [s.strip() for s in prefix.split(':', 1)]
if source != "Source engine":
# Not a source engine context
return (None, None)
return None, None
if not self.isValidGameType(game) or not self.isValidServer(game, server):
return (None, None)
return (game, server)
except (AttributeError, ValueError),e:
return (None, None);
return None, None
return game, server
except (AttributeError, ValueError) as e:
return None, None
def parseSourceIdentity(self, identity):
"""
Parse comma separated source engine identity string key value pairs
@@ -574,31 +566,32 @@ class source(MumoModule):
d = {}
for k, v in [var.split(':', 1) for var in identity.split(';')]:
d[k] = int(v)
# Make sure mandatory values are present
if not "team" in d: return None
return d
if "team" not in d:
return None
return d
except (AttributeError, ValueError):
return None
def getGameConfig(self, game, variable):
"""
Return the game specific value for the given variable if it exists. Otherwise the generic value
"""
sectionname = "game:" + game
cfg = self.cfg()
if sectionname not in cfg:
return cfg.generic[variable]
return cfg[sectionname][variable]
def dlog(self, sid, state, what, *argc):
""" Debug log output helper for user state related things """
self.log().debug("(%d) (%d|%d) " + what, sid, state.session, state.userid, *argc)
def handle(self, server, new_state):
"""
Takes the updated state of the user and collects all
@@ -607,32 +600,32 @@ class source(MumoModule):
"""
sid = server.id()
session = new_state.session
self.dlog(sid, new_state, "Handle state change")
old_user = self.users.get(sid, session)
if old_user and not old_user.hasContextOrIdentityChanged(new_state):
# No change in relevant fields. Simply update state for reference
old_user.updateState(new_state)
self.dlog(sid, new_state, "State change irrelevant for plugin")
return
game, game_server = self.parseSourceContext(new_state.context)
identity = self.parseSourceIdentity(new_state.identity)
self.dlog(sid, new_state, "Context: %s -> '%s' / '%s'", repr(new_state.context), game, game_server)
self.dlog(sid, new_state, "Identity: %s -> '%s'", repr(new_state.identity), identity)
updated_user = User(new_state, identity, game, game_server)
self.dlog(sid, new_state, "Starting transition")
self.userTransition(server, old_user, updated_user)
self.dlog(sid, new_state, "Transition complete")
#
#--- Server callback functions
# --- Server callback functions
#
def userDisconnected(self, server, state, context=None):
"""
Handle disconnect to be able to delete unused channels
@@ -640,9 +633,9 @@ class source(MumoModule):
"""
sid = server.id()
session = state.session
self.userTransition(server, self.users.get(sid, session), None)
def userStateChanged(self, server, state, context=None):
"""
Default state change for user. Could be something uninteresting for
@@ -650,21 +643,21 @@ class source(MumoModule):
string change triggered by starting to play.
"""
self.handle(server, state)
def userConnected(self, server, state, context=None):
"""
First time we see the state for a user. userStateChanged behavior
applies.
"""
self.handle(server, state)
def channelRemoved(self, server, state, context=None):
"""
Updates internal accounting for channels controlled by the plugin.
"""
cid = state.id
sid = server.id()
self.log().debug("(%d) Channel %d removed.", sid, cid)
self.db.dropChannel(sid, cid)
@@ -680,17 +673,19 @@ class source(MumoModule):
_, _, game, server, team = channel
self.db.mapName(name, sid, game, server, team)
self.log().debug("(%d) Name mapping for channel %d updated to '%s'", sid, cid, name)
def userTextMessage(self, server, user, message, current=None): pass
def channelCreated(self, server, state, context=None): pass
def userTextMessage(self, server, user, message, current=None):
pass
def channelCreated(self, server, state, context=None):
pass
#
#--- Meta callback functions
# --- Meta callback functions
#
def started(self, server, context=None):
self.log().debug("Started")
def stopped(self, server, context=None):
self.log().debug("Stopped")

View File

@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- coding: utf-8
# Copyright (C) 2013 Stefan Hacker <dd0t@users.sourceforge.net>
@@ -29,22 +29,25 @@
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import unittest
import Queue
import config
import queue
import re
import logging
import source
import unittest
import config
from . import source
class InvalidChannelExceptionMock(Exception):
pass
class StateMock():
def __init__(self, cid = 0, session = 0, userid = -1):
def __init__(self, cid=0, session=0, userid=-1):
self.channel = cid
self.session = session
self.userid = userid
class ChannelStateMock():
def __init__(self, cid, name, parent, groups, acls):
self.id = cid
@@ -52,66 +55,68 @@ class ChannelStateMock():
self.parent = parent
self.groups = groups
self.acls = acls
class ServerMock():
def __init__(self, sid):
self.sid = sid
self._reset()
def id(self):
return self.sid
def _lastChannelID(self):
return self.uid
def addChannel(self, name, parent):
self.uid += 1
assert(not self.uid in self.channels)
assert (not self.uid in self.channels)
self.channels[self.uid] = ChannelStateMock(self.uid,
name,
parent,
{},
[])
return self.uid
def addUserToGroup(self, cid, session, group):
c = self._getChan(cid)
if session in c.groups:
c.groups[session].add(group)
else:
c.groups[session] = set([group])
c.groups[session] = {group}
def _getChan(self, cid):
if not cid in self.channels:
if cid not in self.channels:
raise InvalidChannelExceptionMock()
return self.channels[cid]
def getChannelState(self, cid):
return self._getChan(cid)
def setState(self, state):
self.user_state.append(state)
def setACL(self, cid, acls, groups, inherit):
c = self._getChan(cid)
c.acls = acls
def _reset(self):
self.uid = 1000
self.channels = {} # See addChannel
self.channels = {} # See addChannel
self.user_state = []
class ACLMock(object):
def __init__(self, applyHere, applySubs, userid, group, deny = 0, allow = 0):
def __init__(self, applyHere, applySubs, userid, group, deny=0, allow=0):
self.applyHere = applyHere
self.applySubs = applySubs
self.userid = userid
self.group = group
self.deny = deny
self.allow = allow
class MurmurMock(object):
InvalidChannelException = InvalidChannelExceptionMock
@@ -120,9 +125,9 @@ class MurmurMock(object):
PermissionTraverse = 2
PermissionWhisper = 4
PermissionSpeak = 8
def _reset(self): pass
def __init__(self):
pass
@@ -132,61 +137,63 @@ class MockACLHelper(object):
T = MurmurMock.PermissionTraverse
W = MurmurMock.PermissionWhisper
S = MurmurMock.PermissionSpeak
EAT = E | T
ALL = E|T|W|S
ALL = E | T | W | S
ACLS = MockACLHelper
class MetaMock():
def __init__(self):
#TODO: Create range of server (or even cretae them on demand)
self.servers = {1:ServerMock(1),
5:ServerMock(5),
10:ServerMock(10)}
self.s = self.servers[1] # Shorthand
# TODO: Create range of server (or even cretae them on demand)
self.servers = {1: ServerMock(1),
5: ServerMock(5),
10: ServerMock(10)}
self.s = self.servers[1] # Shorthand
def getServer(self, sid):
return self.servers.get(sid, None)
def _reset(self):
for server in self.servers.itervalues():
for server in self.servers.values():
server._reset()
class ManagerMock():
SERVERS_ALL = [-1]
def __init__(self):
self.q = Queue.Queue()
self.q = queue.Queue()
self.m = MurmurMock()
self.meta = MetaMock()
def getQueue(self):
return self.q
def getMurmurModule(self):
return self.m
def getMeta(self):
return self.meta
def subscribeServerCallbacks(self, callback, servers):
self.serverCB = {'callback' : callback, 'servers' : servers}
self.serverCB = {'callback': callback, 'servers': servers}
def subscribeMetaCallbacks(self, callback, servers):
self.metaCB = {'callback' : callback, 'servers' : servers}
self.metaCB = {'callback': callback, 'servers': servers}
class Test(unittest.TestCase):
def setUp(self):
self.mm = ManagerMock();
self.mserv = self.mm.meta.getServer(1)
testconfig = config.Config(None, source.source.default_config)
testconfig.source.database = ":memory:"
# As it is hard to create the read only config structure from
# hand use a spare one to steal from
spare = config.Config(None, source.source.default_config)
@@ -195,46 +202,46 @@ class Test(unittest.TestCase):
testconfig.__dict__['game:tf'].teams = ["Lobby", "Spectator", "Blue", "Red"]
testconfig.__dict__['game:tf'].serverregex = re.compile("^\[A-1:123\]$")
testconfig.__dict__['game:tf'].servername = "Test %(game)s %(server)s"
self.s = source.source("source", self.mm, testconfig)
self.mm.s = self.s
# Since we don't want to run threaded if we don't have to
# emulate startup to the derived class function
self.s.onStart()
self.s.connected()
# Critical test assumption
self.assertEqual(self.mm.metaCB['callback'], self.s)
self.assertEqual(self.mm.serverCB['callback'], self.s)
def resetDB(self):
self.s.db.reset()
def resetState(self):
self.resetDB()
self.mm.m._reset()
self.mm.meta._reset()
def tearDown(self):
self.s.disconnected()
self.s.onStop()
def testDefaultConfig(self):
self.resetState()
mm = ManagerMock()
INVALIDFORCEDEFAULT = ""
s = source.source("source", mm, INVALIDFORCEDEFAULT)
self.assertNotEqual(s.cfg(), None)
def testConfiguration(self):
self.resetState()
# Ensure the default configuration makes sense
self.assertEqual(self.mm.serverCB['servers'], self.mm.SERVERS_ALL)
self.assertEqual(self.mm.metaCB['servers'], self.mm.SERVERS_ALL)
self.assertEqual(self.s.cfg().source.basechannelid, 0)
self.assertEqual(self.s.cfg().generic.name, "%(game)s")
@@ -243,35 +250,35 @@ class Test(unittest.TestCase):
def testIdentityParser(self):
self.resetState()
expected = {"universe" : 1,
"account_type" : 2,
"id" : 3,
"instance" : 4,
"team" : 5}
expected = {"universe": 1,
"account_type": 2,
"id": 3,
"instance": 4,
"team": 5}
got = self.s.parseSourceIdentity("universe:1;account_type:2;id:00000003;instance:4;team:5")
self.assertDictEqual(expected, got)
got = self.s.parseSourceIdentity("universe:1;account_type:2;id:00000003;instance:4;")
self.assertEqual(got, None, "Required team variable missing")
self.assertEqual(self.s.parseSourceIdentity(None), None)
self.assertEqual(self.s.parseSourceIdentity(""), None)
self.assertEqual(self.s.parseSourceIdentity("whatever:4;dskjfskjdfkjsfkjsfkj"), None)
def testContextParser(self):
self.resetState()
none = (None, None)
self.assertEqual(self.s.parseSourceContext(None), none)
self.assertEqual(self.s.parseSourceContext(""), none)
self.assertEqual(self.s.parseSourceContext("whatever:4;skjdakjkjwqdkjqkj"), none)
expected = ("dod", "[A-1:2807761920(3281)]")
actual = self.s.parseSourceContext("Source engine: dod\x00[A-1:2807761920(3281)]\x00")
self.assertEqual(expected, actual)
expected = ("dod", "[0:1]")
actual = self.s.parseSourceContext("Source engine: dod\x00[0:1]\x00")
self.assertEqual(expected, actual)
@@ -279,266 +286,279 @@ class Test(unittest.TestCase):
expected = ("cstrike", "[0:1]")
actual = self.s.parseSourceContext("Source engine: cstrike\x00[0:1]\x00")
self.assertEqual(expected, actual)
actual = self.s.parseSourceContext("Source engine: fake\x00[A-1:2807761920(3281)]\x00")
self.assertEqual(none, actual)
actual = self.s.parseSourceContext("Source engine: cstrike\x0098vcv98re98ver98ver98v\x00")
self.assertEqual(none, actual)
# Check alternate serverregex
expected = ("tf", "[A-1:123]")
actual = self.s.parseSourceContext("Source engine: tf\x00[A-1:123]\x00")
self.assertEqual(expected, actual)
actual = self.s.parseSourceContext("Source engine: tf\x00[A-1:2807761920(3281)]\x00")
self.assertEqual(none, actual)
def checkACLThings(self, acls, things):
self.assertEqual(len(things), len(acls))
i = 0
for thing in things:
acl = acls[i]
for attr, val in thing.iteritems():
for attr, val in thing.items():
self.assertEqual(getattr(acl, attr), val)
i += 1
def testGetOrCreateChannelFor(self):
mumble_server = self.mserv
prev = mumble_server._lastChannelID()
game = "tf"; server = "[A-1:123]"; team = 3
game = "tf"
server = "[A-1:123]"
team = 3
cid = self.s.getOrCreateChannelFor(mumble_server, game, server, team)
self.assertEqual(3, cid - prev)
c = mumble_server.channels
self.assertEqual(c[prev + 1].parent, 0)
self.assertEqual(c[prev + 2].parent, prev + 1)
self.assertEqual(c[prev + 3].parent, prev + 2)
self.assertEqual(c[prev + 1].name, "Team Fortress 2")
self.assertEqual(c[prev + 2].name, "Test tf [A-1:123]")
self.assertEqual(c[prev + 3].name, "Red")
sid = mumble_server.id()
self.assertEqual(self.s.db.cidFor(sid, game), prev + 1);
self.assertEqual(self.s.db.cidFor(sid, game, server), prev + 2);
self.assertEqual(self.s.db.cidFor(sid, game, server, team), prev + 3);
self.assertEqual(self.s.db.cidFor(sid, game), prev + 1)
self.assertEqual(self.s.db.cidFor(sid, game, server), prev + 2)
self.assertEqual(self.s.db.cidFor(sid, game, server, team), prev + 3)
gotcid = self.s.getOrCreateChannelFor(mumble_server, game, server, team)
self.assertEqual(cid, gotcid)
c = mumble_server.channels
self.checkACLThings(c[prev + 3].acls, [{'group' : '~source_tf_[A-1:123]_3'}])
self.checkACLThings(c[prev + 2].acls, [{'group' : '~source_tf_[A-1:123]'}])
self.checkACLThings(c[prev + 3].acls, [{'group': '~source_tf_[A-1:123]_3'}])
self.checkACLThings(c[prev + 2].acls, [{'group': '~source_tf_[A-1:123]'}])
self.checkACLThings(c[prev + 1].acls, [{},
{'group' : '~source_tf'}])
#print self.s.db.db.execute("SELECT * FROM source").fetchall()
{'group': '~source_tf'}])
# print self.s.db.db.execute("SELECT * FROM source").fetchall()
def testGetGameName(self):
self.resetState()
self.assertEqual(self.s.getGameName("tf"), "Team Fortress 2")
self.assertEqual(self.s.getGameName("invalid"), "%(game)s");
self.assertEqual(self.s.getGameName("invalid"), "%(game)s")
def testGetServerName(self):
self.resetState()
self.assertEqual(self.s.getServerName("tf"), "Test %(game)s %(server)s")
self.assertEqual(self.s.getServerName("invalid"), "%(server)s");
self.assertEqual(self.s.getServerName("invalid"), "%(server)s")
def testGetTeamName(self):
self.resetState()
self.assertEqual(self.s.getTeamName("tf", 2), "Blue")
self.assertEqual(self.s.getTeamName("tf", 100), "100") #oob
self.assertEqual(self.s.getTeamName("tf", 100), "100") # oob
self.assertEqual(self.s.getTeamName("invalid", 2), "Team one")
self.assertEqual(self.s.getTeamName("invalid", 100), "100") #oob
self.assertEqual(self.s.getTeamName("invalid", 100), "100") # oob
def testValidGameType(self):
self.resetState()
self.assertTrue(self.s.isValidGameType("dod"))
self.assertTrue(self.s.isValidGameType("cstrike"))
self.assertTrue(self.s.isValidGameType("tf"))
self.assertFalse(self.s.isValidGameType("dodx"))
self.assertFalse(self.s.isValidGameType("xdod"))
self.assertFalse(self.s.isValidGameType(""))
def testValidServer(self):
self.resetState()
self.assertTrue(self.s.isValidServer("dod", "[A-1:2807761920(3281)]"))
self.assertFalse(self.s.isValidServer("dod", "A-1:2807761920(3281)]"))
self.assertFalse(self.s.isValidServer("dod", "[A-1:2807761920(3281)"))
self.assertFalse(self.s.isValidServer("dod", "[A-1:2807761920(3281)&]"))
self.assertTrue(self.s.isValidServer("tf", "[A-1:123]"))
self.assertFalse(self.s.isValidServer("tf", "x[A-1:123]"))
self.assertFalse(self.s.isValidServer("tf", "[A-1:123]x"))
def testMoveUser(self):
self.resetState()
mumble_server = self.mserv
user_state = StateMock()
prev = self.mserv._lastChannelID()
TEAM_BLUE = 2
TEAM_RED = 3
BASE_SID = 0
GAME_SID = prev + 1
SERVER_SID = prev + 2
TEAM_RED_SID = prev + 3
TEAM_BLUE_SID = prev + 4
user = source.User(user_state, {'team':TEAM_BLUE}, "tf", "[A-1:123]")
user = source.User(user_state, {'team': TEAM_BLUE}, "tf", "[A-1:123]")
self.s.moveUser(self.mserv, user)
c = mumble_server.channels
self.assertEqual(c[prev + 1].parent, BASE_SID)
self.assertEqual(c[prev + 2].parent, GAME_SID)
self.assertEqual(c[prev + 3].parent, SERVER_SID)
self.assertEqual(c[prev + 1].name, "Team Fortress 2")
self.assertEqual(c[prev + 2].name, "Test tf [A-1:123]")
self.assertEqual(c[prev + 3].name, "Blue")
self.assertEqual(len(c), 3)
self.assertEqual(user_state.channel, TEAM_RED_SID)
self.assertEqual(mumble_server.user_state[0], user_state)
user.identity['team'] = TEAM_RED
self.s.moveUser(self.mserv, user)
self.assertEqual(c[prev + 4].parent, SERVER_SID)
self.assertEqual(c[prev + 4].name, "Red")
self.assertEqual(len(c), 4)
self.assertEqual(user_state.channel, TEAM_BLUE_SID)
self.assertEqual(mumble_server.user_state[0], user_state)
def testValidateChannelDB(self):
self.resetState()
self.s.db.registerChannel(5, 6, "7")
self.s.db.registerChannel(5, 7, "7", "8")
self.s.db.registerChannel(5, 8, "7", "8", 9)
self.s.db.registerChannel(6, 9, "8", "9", 10)
self.s.db.registerChannel(5, 10, "7", "123", 9)
game = 'cstrike'; server = '[A123:123]'; team = 1
game = 'cstrike'
server = '[A123:123]'
team = 1
self.s.getOrCreateChannelFor(self.mserv, game, server, team)
self.s.validateChannelDB()
self.assertEqual(len(self.s.db.registeredChannels()), 3)
def testSetACLsForGameChannel(self):
self.resetState()
mumble_server = self.mserv
cid = mumble_server.addChannel("test", 1); game = "dod"
cid = mumble_server.addChannel("test", 1)
game = "dod"
self.s.setACLsForGameChannel(mumble_server, cid, game)
acls = mumble_server.channels[cid].acls
self.checkACLThings(acls, [{'applyHere' : True,
'applySubs' : True,
'userid' : -1,
'group' : 'all',
'deny' : ACLS.ALL,
'allow' : 0},
{'applyHere' : True,
'applySubs' : False,
'userid' : -1,
'group' : '~source_dod',
'deny' : 0,
'allow' : ACLS.EAT}])
self.checkACLThings(acls, [{'applyHere': True,
'applySubs': True,
'userid': -1,
'group': 'all',
'deny': ACLS.ALL,
'allow': 0},
{'applyHere': True,
'applySubs': False,
'userid': -1,
'group': '~source_dod',
'deny': 0,
'allow': ACLS.EAT}])
def testSetACLsForServerChannel(self):
self.resetState()
mumble_server = self.mserv
cid = mumble_server.addChannel("test", 1); game = "tf"; server = "[A-1:SomeServer]"
cid = mumble_server.addChannel("test", 1)
game = "tf"
server = "[A-1:SomeServer]"
self.s.setACLsForServerChannel(mumble_server, cid, game, server)
acls = mumble_server.channels[cid].acls
self.checkACLThings(acls, [{'applyHere' : True,
'applySubs' : False,
'userid' : -1,
'group' : '~source_tf_[A-1:SomeServer]',
'deny' : 0,
'allow' : ACLS.EAT}])
self.checkACLThings(acls, [{'applyHere': True,
'applySubs': False,
'userid': -1,
'group': '~source_tf_[A-1:SomeServer]',
'deny': 0,
'allow': ACLS.EAT}])
def testSetACLsForTeamChannel(self):
self.resetState()
mumble_server = self.mserv
cid = mumble_server.addChannel("test", 1); game = "tf"; server = "[A-1:SomeServer]"; team = 2
cid = mumble_server.addChannel("test", 1)
game = "tf"
server = "[A-1:SomeServer]"
team = 2
self.s.setACLsForTeamChannel(mumble_server, cid, game, server, team)
acls = mumble_server.channels[cid].acls
self.checkACLThings(acls, [{'applyHere' : True,
'applySubs' : False,
'userid' : -1,
'group' : '~source_tf_[A-1:SomeServer]_2',
'deny' : 0,
'allow' : ACLS.ALL}])
self.checkACLThings(acls, [{'applyHere': True,
'applySubs': False,
'userid': -1,
'group': '~source_tf_[A-1:SomeServer]_2',
'deny': 0,
'allow': ACLS.ALL}])
def testAddToGroups(self):
self.resetState()
mumble_server = self.mserv
prev = mumble_server._lastChannelID()
session = 10; game = 'cstrike'; server = '[A-1:12345]'; team = 1
session = 10
game = 'cstrike'
server = '[A-1:12345]'
team = 1
self.s.getOrCreateChannelFor(mumble_server, game, server, team)
# Test
self.s.addToGroups(mumble_server, session, game, server, team)
groups = mumble_server.channels[prev + 1].groups[session]
self.assertIn("source_cstrike", groups)
self.assertIn("source_cstrike_[A-1:12345]", groups)
self.assertIn("source_cstrike_[A-1:12345]_1", groups)
def testChannelNameMapping(self):
self.resetState()
mumble_server = self.mserv
game = 'cstrike'; server = '[A-1:12345]'; team = 1
game = 'cstrike'
server = '[A-1:12345]'
team = 1
self.s.getOrCreateChannelFor(mumble_server, game, server, team)
cids = []
for c in mumble_server.channels.itervalues():
for c in mumble_server.channels.values():
c.name = str(c.id)
self.s.channelStateChanged(mumble_server, c)
cids.append(c.id)
mumble_server._reset()
self.s.validateChannelDB()
self.assertEqual(len(mumble_server.channels), 0)
self.assertEqual(len(self.s.db.registeredChannels()), 0)
self.s.getOrCreateChannelFor(mumble_server, game, server, team)
for cid in cids:
self.assertEqual(mumble_server._getChan(cid).name, str(cid))
if __name__ == "__main__":
#logging.basicConfig(level = logging.DEBUG)
#import sys;sys.argv = ['', 'Test.testName']
unittest.main()
# logging.basicConfig(level = logging.DEBUG)
# import sys;sys.argv = ['', 'Test.testName']
unittest.main()

View File

@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- coding: utf-8
# Copyright (C) 2013 Stefan Hacker <dd0t@users.sourceforge.net>
@@ -34,31 +34,32 @@ class User(object):
User to hold state as well as parsed data fields in a
sane fashion.
"""
def __init__(self, state, identity=None, game=None, server=None):
self.state = state
self.identity = identity or {}
self.server = server
self.game = game
def valid(self):
"""
True if valid data is available for all fields
"""
return self.state and self.identity and self.server and self.game
def hasContextOrIdentityChanged(self, otherstate):
"""
Checks whether the given state diverges from this users's
"""
return self.state.context != otherstate.context or \
self.state.identity != otherstate.identity
self.state.identity != otherstate.identity
def updateState(self, state):
"""
Updates the state of this user
"""
self.state = state
def updateData(self, identity, game, server):
"""
Updates the data fields for this user
@@ -66,16 +67,17 @@ class User(object):
self.identity = identity
self.game = game
self.server = server
class UserRegistry(object):
"""
Registry to store User objects for given servers
and sessions.
"""
def __init__(self):
self.users = {} # {session:user, ...}
self.users = {} # {session:user, ...}
def get(self, sid, session):
"""
Return user or None from registry
@@ -84,34 +86,34 @@ class UserRegistry(object):
return self.users[sid][session]
except KeyError:
return None
def add(self, sid, session, user):
"""
Add new user to registry
"""
assert(isinstance(user, User))
assert (isinstance(user, User))
if not sid in self.users:
self.users[sid] = {session:user}
self.users[sid] = {session: user}
elif not session in self.users[sid]:
self.users[sid][session] = user
else:
return False
return True
def addOrUpdate(self, sid, session, user):
"""
Add user or overwrite existing one (identified by sid + session)
"""
assert(isinstance(user, User))
assert (isinstance(user, User))
if not sid in self.users:
self.users[sid] = {session:user}
self.users[sid] = {session: user}
else:
self.users[sid][session] = user
return True
def remove(self, sid, session):
"""
Remove user from registry
@@ -120,15 +122,14 @@ class UserRegistry(object):
del self.users[sid][session]
except KeyError:
return False
return True
return True
def usingChannel(self, sid, cid):
"""
Return true if any user in the registry is occupying the given channel
"""
for user in self.users[sid].itervalues():
for user in self.users[sid].values():
if user.state and user.state.channel == cid:
return True
return False
return False

View File

@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- coding: utf-8
# Copyright (C) 2013 Stefan Hacker <dd0t@users.sourceforge.net>
@@ -31,51 +31,55 @@
import unittest
from users import User, UserRegistry
from .users import User, UserRegistry
class Test(unittest.TestCase):
def getSomeUsers(self, n =5):
sid = []; session = []; user = []
def getSomeUsers(self, n=5):
sid = []
session = []
user = []
for i in range(n):
s=str(i)
sid.append(i) ; session.append(i)
user.append(User("state"+s, "identity"+s, "game"+s, "server"+s))
s = str(i)
sid.append(i)
session.append(i)
user.append(User("state" + s, "identity" + s, "game" + s, "server" + s))
return sid, session, user
def testRegistryCRUDOps(self):
r = UserRegistry()
sid, session, user = self.getSomeUsers()
# Create & Read
self.assertTrue(r.add(sid[0], session[0], user[0]))
self.assertFalse(r.add(sid[0], session[0], user[0]))
self.assertEqual(r.get(sid[0], session[0]), user[0])
self.assertTrue(r.addOrUpdate(sid[1], session[1], user[1]))
self.assertEqual(r.get(sid[1], session[1]), user[1])
# Update
self.assertTrue(r.addOrUpdate(sid[0], session[0], user[2]))
self.assertEqual(r.get(sid[0], session[0]), user[2])
# Delete
self.assertTrue(r.remove(sid[1], session[1]))
self.assertFalse(r.remove(sid[1], session[1]))
self.assertEqual(r.get(sid[1], session[1]), None)
self.assertTrue(r.remove(sid[0], session[0]))
self.assertFalse(r.remove(sid[0], session[0]))
self.assertEqual(r.get(sid[0], session[0]), None)
def testUser(self):
u = User("State", {'team':2} , "tf", "Someserver")
u = User("State", {'team': 2}, "tf", "Someserver")
self.assertTrue(u.valid())
self.assertFalse(User("State").valid())
if __name__ == "__main__":
#import sys;sys.argv = ['', 'Test.testName']
unittest.main()
# import sys;sys.argv = ['', 'Test.testName']
unittest.main()