80 lines
2.5 KiB
Python
80 lines
2.5 KiB
Python
|
import wolframalpha
|
||
|
|
||
|
from edmond.plugin import Plugin
|
||
|
|
||
|
|
||
|
class WolframAlphaPlugin(Plugin):
|
||
|
"""Handle unknown command by sending it to WolframAlpha."""
|
||
|
|
||
|
REQUIRED_CONFIGS = ["commands", "api_key", "max_pods"]
|
||
|
MAX_LENGTH = 256
|
||
|
CUT_MARK = "..."
|
||
|
|
||
|
def __init__(self, bot):
|
||
|
super().__init__(bot)
|
||
|
self.priority = -6
|
||
|
self._client = None
|
||
|
|
||
|
@property
|
||
|
def client(self):
|
||
|
if self._client is None and self.has_api_key():
|
||
|
self._client = wolframalpha.Client(self.config["api_key"])
|
||
|
return self._client
|
||
|
|
||
|
def on_welcome(self, _):
|
||
|
if not self.config["api_key"]:
|
||
|
self.bot.log_w("API key unavailable.")
|
||
|
self.is_ready = False
|
||
|
|
||
|
def on_pubmsg(self, event):
|
||
|
if not self.should_handle_command(event.arguments[0]):
|
||
|
return False
|
||
|
self.process_query(self.command.content, event.target)
|
||
|
return True
|
||
|
|
||
|
def process_query(self, query, target):
|
||
|
self.bot.log_d(f"Processing '{query}' with WolframAlpha.")
|
||
|
try:
|
||
|
response = self.client.query(query)
|
||
|
except Exception as exc: # unstable lib
|
||
|
self.bot.log_w(f"wolframalpha exception: {exc}")
|
||
|
self.signal_failure(target)
|
||
|
return
|
||
|
|
||
|
if not response["@success"]:
|
||
|
self.bot.log_d("Call to WA succeeded but response is an error.")
|
||
|
self.signal_failure(target)
|
||
|
return
|
||
|
|
||
|
inputs = []
|
||
|
answers = []
|
||
|
num_pods = 0
|
||
|
for pod in response.pods:
|
||
|
for subpod in pod.subpods:
|
||
|
self.bot.log_d(f"WolframAlpha subpod: {subpod}")
|
||
|
if pod["@id"] == "Input":
|
||
|
inputs.append(self.sanitize_text(subpod.plaintext or ""))
|
||
|
else:
|
||
|
answers.append(self.sanitize_text(subpod.plaintext or ""))
|
||
|
num_pods += 1
|
||
|
if num_pods >= self.config["max_pods"]:
|
||
|
break
|
||
|
if num_pods >= self.config["max_pods"]:
|
||
|
break
|
||
|
|
||
|
input_text = ", ".join(inputs)
|
||
|
answer_text = ", ".join(answers)
|
||
|
if input_text:
|
||
|
reply = input_text + " -- " + answer_text
|
||
|
else:
|
||
|
reply = answer_text
|
||
|
if len(reply) > self.MAX_LENGTH - len(self.CUT_MARK):
|
||
|
reply = (
|
||
|
reply[: self.MAX_LENGTH - len(self.CUT_MARK)] + self.CUT_MARK
|
||
|
)
|
||
|
self.bot.say(target, reply)
|
||
|
|
||
|
@staticmethod
|
||
|
def sanitize_text(text):
|
||
|
return text.replace("\n", ", ")
|