Introduce constraints and name mapping to source plugin database.

- Using default values now instead of NULL to signal empty field
  because NULL values are considered distinct in the context of
  UNIQUE table constraints in sqlite
- Introduced new table for mappings from sid, game, server and team
  to channel name i preparation for new functionality
- Updated and expanded unit test
This commit is contained in:
Stefan Hacker
2013-03-23 15:16:23 +01:00
parent 28195110c0
commit 0782eace33
5 changed files with 130 additions and 35 deletions

View File

@@ -34,6 +34,9 @@ import sqlite3
#TODO: Functions returning channels probably should return a dict instead of a tuple
class SourceDB(object):
NO_SERVER = ""
NO_TEAM = -1
def __init__(self, path = ":memory:"):
"""
Initialize the sqlite database in the given path. If no path
@@ -41,7 +44,26 @@ class SourceDB(object):
"""
self.db = sqlite3.connect(path)
if self.db:
self.db.execute("CREATE TABLE IF NOT EXISTS source(sid INTEGER, cid INTEGER, game TEXT, server TEXT, team INTEGER)")
self.db.execute("""
CREATE TABLE IF NOT EXISTS controlled_channels(
sid INTEGER NOT NULL,
cid INTEGER NOT NULL,
game TEXT NOT NULL,
server TEXT NOT NULL default "",
team INTEGER NOT NULL default -1,
UNIQUE(sid, cid),
PRIMARY KEY (sid, game, server, team)
)""")
self.db.execute("""
CREATE TABLE IF NOT EXISTS mapped_names (
sid INTEGER NOT NULL,
game TEXT NOT NULL,
server TEXT NOT NULL default "",
team INTEGER default -1,
name TEXT NOT NULL,
PRIMARY KEY (sid, game, server, team)
)""")
self.db.execute("VACUUM")
self.db.commit()
@@ -59,8 +81,32 @@ class SourceDB(object):
True if the database is correctly initialized
"""
return self.db != None
def nameFor(self, sid, game, server = NO_SERVER, team = NO_TEAM, default = ""):
"""
Returns the mapped name for the given parameters or default if no
mapping exists.
"""
assert(sid != None and game != None)
assert(not (team != self.NO_TEAM and server == self.NO_SERVER))
def cidFor(self, sid, game, server = None, team = None):
v = self.db.execute("SELECT name FROM mapped_names WHERE sid is ? and game is ? and server is ? and team is ?", [sid, game, server, team]).fetchone()
return v[0] if v else default
def mapName(self, name, sid, game, server = NO_SERVER, team = NO_TEAM):
"""
Stores a mapping for the given (sid, game, server, team) combination
to the given name. The mapping can then be retrieved with nameFor() in
the future.
"""
assert(sid != None and game != None)
assert(not (team != self.NO_TEAM and server == self.NO_SERVER))
self.db.execute("INSERT OR REPLACE into mapped_names (sid, game, server, team, name) VALUES (?,?,?,?,?)",[sid, game, server, team, name])
self.db.commit()
def cidFor(self, sid, game, server = NO_SERVER, team = NO_TEAM):
"""
Returns the channel id for game specific channel. If only game
is passed the game root channel cid is returned. If additionally
@@ -70,9 +116,9 @@ class SourceDB(object):
before None is returned.
"""
assert(sid != None and game != None)
assert(not (team != None and server == None))
assert(not (team != self.NO_TEAM and server == self.NO_SERVER))
v = self.db.execute("SELECT cid FROM source WHERE sid is ? and game is ? and server is ? and team is ?", [sid, game, server, team]).fetchone()
v = self.db.execute("SELECT cid FROM controlled_channels WHERE sid is ? and game is ? and server is ? and team is ?", [sid, game, server, team]).fetchone()
return v[0] if v else None
def channelForCid(self, sid, cid):
@@ -81,20 +127,20 @@ class SourceDB(object):
Returns None if the cid is unknown.
"""
assert(sid != None and cid != None)
return self.db.execute("SELECT sid, cid, game, server, team FROM source WHERE sid is ? and cid is ?", [sid, cid]).fetchone()
return self.db.execute("SELECT sid, cid, game, server, team FROM controlled_channels WHERE sid is ? and cid is ?", [sid, cid]).fetchone()
def channelFor(self, sid, game, server = None, team = None):
def channelFor(self, sid, game, server = NO_SERVER, team = NO_TEAM):
"""
Returns matching channel as (sid, cid, game, server, team) tuple. Matching
behavior is the same as for cidFor()
"""
assert(sid != None and game != None)
assert(not (team != None and server == None))
assert(not (team != self.NO_TEAM and server == self.NO_SERVER))
v = self.db.execute("SELECT sid, cid, game, server, team FROM source WHERE sid is ? and game is ? and server is ? and team is ?", [sid, game, server, team]).fetchone()
v = self.db.execute("SELECT sid, cid, game, server, team FROM controlled_channels WHERE sid is ? and game is ? and server is ? and team is ?", [sid, game, server, team]).fetchone()
return v
def channelsFor(self, sid, game, server = None, team = None):
def channelsFor(self, sid, game, server = NO_SERVER, team = NO_TEAM):
"""
Returns matching channels as a list of (sid, cid, game, server, team) tuples.
If only the game is passed all server and team channels are matched.
@@ -102,19 +148,19 @@ class SourceDB(object):
Returns empty list if no matches are found.
"""
assert(sid != None and game != None)
assert(not (team != None and server == None))
assert(not (team != self.NO_TEAM and server == self.NO_SERVER))
suffix, params = self.__whereClauseForOptionals(server, team)
return self.db.execute("SELECT sid, cid, game, server, team FROM source WHERE sid is ? and game is ?" + suffix, [sid, game] + params).fetchall()
return self.db.execute("SELECT sid, cid, game, server, team FROM controlled_channels WHERE sid is ? and game is ?" + suffix, [sid, game] + params).fetchall()
def registerChannel(self, sid, cid, game, server = None, team = None):
def registerChannel(self, sid, cid, game, server = NO_SERVER, team = NO_TEAM):
"""
Register a given channel with the database.
"""
assert(sid != None and game != None)
assert(not (team != None and server == None))
assert(not (team != self.NO_TEAM and server == self.NO_SERVER))
self.db.execute("INSERT INTO source (sid, cid, game, server, team) VALUES (?,?,?,?,?)", [sid, cid, game, server, team])
self.db.execute("INSERT INTO controlled_channels (sid, cid, game, server, team) VALUES (?,?,?,?,?)", [sid, cid, game, server, team])
self.db.commit()
return True
@@ -126,50 +172,55 @@ class SourceDB(object):
Returns (suffix, additional parameters) tuple
"""
if server != None and team != None:
if server != self.NO_SERVER and team != self.NO_TEAM:
return (" and server is ? and team is ?", [server, team])
elif server != None:
elif server != self.NO_SERVER:
return (" and server is ?", [server])
else:
return ("", [])
def unregisterChannel(self, sid, game, server = None, team = None):
def unregisterChannel(self, sid, game, server = NO_SERVER, team = NO_TEAM):
"""
Unregister a channel previously registered with the database.
"""
assert(sid != None and game != None)
assert(not (team != None and server == None))
assert(not (team != self.NO_TEAM and server == self.NO_SERVER))
suffix, params = self.__whereClauseForOptionals(server, team)
self.db.execute("DELETE FROM source WHERE sid is ? and game is ?" + suffix, [sid, game] + params)
self.db.execute("DELETE FROM controlled_channels WHERE sid is ? and game is ?" + suffix, [sid, game] + params)
self.db.commit()
def dropChannel(self, sid, cid):
"""
Drops channel with given sid + cid
"""
self.db.execute("DELETE FROM source WHERE sid is ? and cid is ?", [sid, cid])
assert(sid != None and cid != None)
self.db.execute("DELETE FROM controlled_channels WHERE sid is ? and cid is ?", [sid, cid])
self.db.commit()
def isRegisteredChannel(self, sid, cid):
"""
Returns true if a channel with given sid and cid is registered
"""
res = self.db.execute("SELECT cid FROM source WHERE sid is ? and cid is ?", [sid, cid]).fetchone()
assert(sid != None and cid != None)
res = self.db.execute("SELECT cid FROM controlled_channels WHERE sid is ? and cid is ?", [sid, cid]).fetchone()
return res != None
def registeredChannels(self):
"""
Returns channels as a list of (sid, cid, game, server team) tuples grouped by sid
"""
return self.db.execute("SELECT sid, cid, game, server, team FROM source ORDER by sid").fetchall()
return self.db.execute("SELECT sid, cid, game, server, team FROM controlled_channels ORDER by sid").fetchall()
def reset(self):
"""
Deletes everything in the database
"""
self.db.execute("DELETE FROM source")
self.db.execute("DELETE FROM mapped_names")
self.db.execute("DELETE FROM controlled_channels")
self.db.commit()
if __name__ == "__main__":
pass
pass

View File

@@ -31,6 +31,7 @@
import unittest
from db import SourceDB
import sqlite3
class SourceDBTest(unittest.TestCase):
def setUp(self):
@@ -133,8 +134,8 @@ class SourceDBTest(unittest.TestCase):
self.db.registerChannel(sid, tcid, game, server, team)
expected = [(sid, bcid, game, None, None),
(sid, scid, game, server, None),
expected = [(sid, bcid, game, self.db.NO_SERVER, self.db.NO_TEAM),
(sid, scid, game, server, self.db.NO_TEAM),
(sid, tcid, game, server, team),
(sid+1, tcid, game, server, team)]
@@ -164,10 +165,10 @@ class SourceDBTest(unittest.TestCase):
self.assertEqual(res, (sid, cid + 2, game, server, team))
res = self.db.channelFor(sid, game, server)
self.assertEqual(res, (sid, cid + 1, game, server, None))
self.assertEqual(res, (sid, cid + 1, game, server, self.db.NO_TEAM))
res = self.db.channelFor(sid, game)
self.assertEqual(res, (sid, cid, game, None, None))
self.assertEqual(res, (sid, cid, game, self.db.NO_SERVER, self.db.NO_TEAM))
res = self.db.channelFor(sid, game, server, team+5)
self.assertEqual(res, None)
@@ -180,11 +181,11 @@ class SourceDBTest(unittest.TestCase):
self.db.registerChannel(sid, cid+2, game, server, team)
res = self.db.channelForCid(sid, cid)
self.assertEqual(res, (sid, cid, game, None, None))
self.assertEqual(res, (sid, cid, game, self.db.NO_SERVER, self.db.NO_TEAM))
res = self.db.channelForCid(sid, cid + 1)
self.assertEqual(res, (sid, cid + 1, game, server, None))
self.assertEqual(res, (sid, cid + 1, game, server, self.db.NO_TEAM))
res = self.db.channelForCid(sid, cid + 2)
@@ -202,8 +203,8 @@ class SourceDBTest(unittest.TestCase):
self.db.registerChannel(sid, cid+2, game, server, team)
chans = ((sid, cid+2, game, server, team),
(sid, cid+1, game, server, None),
(sid, cid, game, None, None))
(sid, cid+1, game, server, self.db.NO_TEAM),
(sid, cid, game, self.db.NO_SERVER, self.db.NO_TEAM))
res = self.db.channelsFor(sid, game, server, team)
self.assertItemsEqual(res, chans[0:1])
@@ -216,6 +217,49 @@ class SourceDBTest(unittest.TestCase):
res = self.db.channelsFor(sid+1, game)
self.assertItemsEqual(res, [])
def testChannelTableConstraints(self):
self.db.reset()
# cid constraint
sid = 1; cid = 0; game = "tf"; server = "serv"; team = 0
self.db.registerChannel(sid, cid, game)
self.assertRaises(sqlite3.IntegrityError, self.db.registerChannel, sid, cid, "cstrike")
# combination constraint
self.assertRaises(sqlite3.IntegrityError, self.db.registerChannel, sid, cid+1000, game)
self.db.registerChannel(sid, cid+1, game, server)
self.assertRaises(sqlite3.IntegrityError, self.db.registerChannel, sid, cid+100, game, server)
self.db.registerChannel(sid, cid+2, game, server, team)
self.assertRaises(sqlite3.IntegrityError, self.db.registerChannel, sid, cid+200, game, server, team)
def testChannelNameMappingTableConstraints(self):
self.db.reset()
sid = 1; game = "tf"
# mapName performs an INSERT OR REPLACE which relies on the UNIQUE constraint
self.db.mapName("SomeTestName", sid, game)
self.db.mapName("SomeOtherName", sid, game)
self.assertEqual(self.db.nameFor(sid, game), "SomeOtherName")
def testNameMapping(self):
self.db.reset()
sid = 1; game = "tf"; server = "[12313]";team = 2
self.assertEqual(self.db.nameFor(sid, game, default = "test"), "test")
self.db.mapName("Game", sid, game)
self.db.mapName("Game Server", sid, game, server)
self.db.mapName("Game Server Team", sid, game, server, team)
self.db.mapName("Game Server Team 2", sid + 1, game, server, team)
self.db.mapName("Game Server Team 2", sid, "cstrike", server, team)
self.assertEqual(self.db.nameFor(sid, game), "Game")
self.assertEqual(self.db.nameFor(sid, game, server), "Game Server")
self.assertEqual(self.db.nameFor(sid, game, server, team), "Game Server Team")
if __name__ == "__main__":
#import sys;sys.argv = ['', 'Test.testName']

View File

@@ -57,7 +57,7 @@ class source(MumoModule):
('teams', commaSeperatedStrings, ["Lobby", "Spectator", "Team one", "Team two", "Team three", "Team four"]),
('restrict', x2bool, True),
('serverregex', re.compile, re.compile("^\[[\w\d\-\(\):]{1,20}\]$")),
('deleteifunused', x2bool, True)
('deleteifunused', x2bool, True)
)
default_config = {'source':(

View File

@@ -202,7 +202,7 @@ class Test(unittest.TestCase):
self.assertEqual(self.mm.serverCB['callback'], self.s)
def resetDB(self):
self.s.db.db.execute("DELETE FROM source");
self.s.db.db.reset()
def resetState(self):
self.resetDB()

View File

@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8
# Copyright (C) 2010-2011 Stefan Hacker <dd0t@users.sourceforge.net>
# Copyright (C) 2010-2013 Stefan Hacker <dd0t@users.sourceforge.net>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without