Files
mumo3/modules/chatparser.py

197 lines
7.1 KiB
Python

#!/usr/bin/env python
import re
import urllib
import amazon
import isodate
import requests
from amazon.api import AmazonAPI
from bs4 import BeautifulSoup
from mumo_module import (commaSeperatedIntegers,
MumoModule)
# YT
youtube_api_url = "https://www.googleapis.com/youtube/v3/videos?part=contentDetails%2C+snippet%2Cstatistics&id={0}&fields=pageInfo(totalResults)%2Citems(snippet(title)%2CcontentDetails(duration)%2Cstatistics(likeCount%2CdislikeCount))&key={1}"
youtube_regex = re.compile(
"(https?://)?(www\.)?(youtube|youtu|youtube-nocookie)\.(com|be)/(watch\?v=|embed/|v/|.+\?v=)?([^&=%\?]{11})")
# STEAM
steam_regex = re.compile("(http|https)://store.steampowered.com/app/([0-9]+)[/\[]?")
steam_genre_regex = re.compile("Genre:</b>.*?<a.*?>(.+?)</a>")
# AMAZON
amazon_regex = re.compile("ama?zo?n\.(?:de|com)/.*(?:dp|gp/product)/([A-Z0-9]{10})(?:(?:/+)|$)?")
# TODO: Make regex more general (de|com)
# Commands
move_regex = re.compile("^moveall (.*)$")
tsp_cmd_regex = re.compile("^!(.+?)(?: (.+?))?(?: (.*))?$")
# Stuff
headers = {
'User-Agent': "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/47.0.2526.49 Safari/537.36"
}
class chatparser(MumoModule):
default_config = {'chatparser': (
('servers', commaSeperatedIntegers, []),
('youtube_api_key', str, ''),
('amazon_access_key', str, ''),
('amazon_secret_key', str, ''),
('amazon_assoc_tag', str, ''),
('amazon_region', str, '')
)
}
def __init__(self, name, manager, configuration=None):
MumoModule.__init__(self, name, manager, configuration)
self.murmur = manager.getMurmurModule()
self.youtube_api_key = self.cfg().chatparser.youtube_api_key
self.amazon_access_key = self.cfg().chatparser.amazon_access_key
self.amazon_secret_key = self.cfg().chatparser.youtube_api_key
self.amazon_assoc_tag = self.cfg().chatparser.youtube_api_key
self.amazon_region = self.cfg().chatparser.amazon_region
self.amazon_api = AmazonAPI(self.amazon_access_key, self.amazon_secret_key,
self.amazon_assoc_tag, region=self.amazon_region)
# Load steam appid <-> game list
r = requests.get("http://api.steampowered.com/ISteamApps/GetAppList/v0002/?format=json")
if r.status_code == 200:
self.glist = r.json()
self.log().info("Got {} apps from steam".format(len(self.glist["applist"]["apps"])))
else:
self.glist = None
def connected(self):
manager = self.manager()
log = self.log()
log.debug("Register for Server callbacks")
servers = self.cfg().chatparser.servers
if not servers:
servers = manager.SERVERS_ALL
manager.subscribeServerCallbacks(self, servers)
def disconnected(self):
pass
def sendMessage(self, server, user, message, msg):
if message.channels:
server.sendMessageChannel(user.channel, False, msg)
else:
server.sendMessage(user.session, msg)
server.sendMessage(message.sessions[0], msg)
#
# --- Server callback functions
#
def userTextMessage(self, server, user, message, current=None):
if len(message.sessions) == 1 or (len(message.channels) == 1 and message.channels[0] == user.channel):
yt = youtube_regex.search(message.text)
if yt:
self.parse_youtube(yt.group(6), user, server, message)
return
stm = steam_regex.search(message.text)
if stm:
self.parse_steam(int(stm.group(2)), user, server, message)
return
amazon = amazon_regex.search(message.text)
if amazon:
self.parse_amazon(amazon.group(1), user, server, message)
def parse_youtube(self, yid, user, server, message):
ytparse = requests.get(youtube_api_url.format(yid, self.youtube_api_key))
self.log().info("YT: %s", yid)
if ytparse.status_code == 200:
if ytparse.json()["pageInfo"]["totalResults"] > 0:
v = ytparse.json()["items"][0]
t = isodate.parse_duration(v["contentDetails"]["duration"])
try:
rate = "%.2f %%" % (
(((float(v["statistics"]["dislikeCount"]) / float(
v["statistics"]["likeCount"])) - 1.0) * -1.0) * 100.0)
except ZeroDivisionError:
rate = "No rating possible"
self.sendMessage(server, user, message,
'<a href="https://youtu.be/{3}">{0}</a> | {1} | {2}'.format(
v["snippet"]["title"], t, rate, yid))
def parse_steam(self, appid, user, server, message):
self.log().info("STEAM: %s", str(appid))
steampage = requests.get("http://store.steampowered.com/app/" + str(appid) + "/?l=en", headers=headers)
price = None
genre = None
if steampage.status_code == 200:
soap = BeautifulSoup(steampage.text, "html.parser")
sprice = soap.find("div", {"class": "game_purchase_price price"})
if sprice:
price = sprice.get_text().strip()
else:
price = "Unkown Price"
reg_genre = steam_genre_regex.search(steampage.text)
if reg_genre:
genre = reg_genre.group(1)
else:
genre = "Unkown Genre"
if self.glist:
for game in self.glist["applist"]["apps"]:
if appid == game["appid"]:
self.sendMessage(server, user, message,
'<a href="http://store.steampowered.com/app/{0}">{1}</a> | {2} | {3}'.format(
appid, game["name"], genre, price))
return
def parse_amazon(self, item_id, user, server, message):
self.log().info("AMAZON: %s", item_id)
try:
product = self.amazon_api.lookup(ItemId=item_id)
except urllib.error.HTTPError as e:
self.log().warning("[AMAZON] request failure: %s", e)
product = None
except amazon.api.AsinNotFound as e2:
self.log().warning("[AMAZON] Article not found: %s", e2)
product = None
if product:
self.sendMessage(server, user, message,
'<a href="https://amazon.de/dp/{0}">{1}</a> | {2} € | {3}'.format(
item_id, product.title, product.price_and_currency[0],
product.get_attribute("ProductGroup")))
def userConnected(self, server, state, context=None):
pass
def userDisconnected(self, server, state, context=None):
pass
def userStateChanged(self, server, state, context=None):
pass
def channelCreated(self, server, state, context=None):
pass
def channelRemoved(self, server, state, context=None):
pass
def channelStateChanged(self, server, state, context=None):
pass