86 lines
2.6 KiB
Python
86 lines
2.6 KiB
Python
try:
|
|
import wolframalpha
|
|
DEPENDENCIES_FOUND = True
|
|
except ImportError:
|
|
DEPENDENCIES_FOUND = False
|
|
|
|
from edmond.plugin import Plugin
|
|
|
|
|
|
class UnknownCommandPlugin(Plugin):
|
|
"""Handle unknown command by sending it to WolframAlpha."""
|
|
|
|
REQUIRED_CONFIGS = ["api_key", "max_pods"]
|
|
MAX_LENGTH = 256
|
|
CUT_MARK = "..."
|
|
|
|
def __init__(self, bot):
|
|
super().__init__(bot)
|
|
self.priority = -6
|
|
self._client = None
|
|
|
|
@property
|
|
def client(self):
|
|
if self._client is None and self.has_api_key():
|
|
self._client = wolframalpha.Client(self.config["api_key"])
|
|
return self._client
|
|
|
|
def has_api_key(self):
|
|
return self.config["api_key"] != ""
|
|
|
|
def on_pubmsg(self, event):
|
|
if not self.has_api_key:
|
|
return False
|
|
message = self.should_read_message(event.arguments[0])
|
|
if not message:
|
|
return False
|
|
words = message.split()
|
|
if len(words) == 0 or words[-1] != self.config["command_suffix"]:
|
|
return False
|
|
|
|
query = " ".join(words[:-1])
|
|
self.process_query(query, event.target)
|
|
return True
|
|
|
|
def process_query(self, query, target):
|
|
self.bot.log_d(f"Processing '{query}' with WolframAlpha.")
|
|
try:
|
|
response = self.client.query(query)
|
|
except: # Capture any exception as this lib is not very stable.
|
|
self.signal_failure(target)
|
|
return
|
|
|
|
if response["@success"] != "true":
|
|
self.signal_failure(target)
|
|
return
|
|
|
|
inputs = []
|
|
answers = []
|
|
num_pods = 0
|
|
for pod in response.pods:
|
|
for subpod in pod.subpods:
|
|
self.bot.log_d(f"WolframAlpha subpod: {subpod}")
|
|
if pod["@id"] == "Input":
|
|
inputs.append(self.sanitize_text(subpod.plaintext or ""))
|
|
else:
|
|
answers.append(self.sanitize_text(subpod.plaintext or ""))
|
|
num_pods += 1
|
|
if num_pods >= self.config["max_pods"]:
|
|
break
|
|
if num_pods >= self.config["max_pods"]:
|
|
break
|
|
|
|
input_text = ", ".join(inputs)
|
|
answer_text = ", ".join(answers)
|
|
if input_text:
|
|
reply = input_text + " -- " + answer_text
|
|
else:
|
|
reply = answer_text
|
|
if len(reply) > self.MAX_LENGTH - len(self.CUT_MARK):
|
|
reply = reply[:self.MAX_LENGTH - len(self.CUT_MARK)] + self.CUT_MARK
|
|
self.bot.say(target, reply)
|
|
|
|
@staticmethod
|
|
def sanitize_text(text):
|
|
return text.replace("\n", ", ")
|