Compare commits
13 commits
ae21c1ff8b
...
a7d3a18ea7
Author | SHA1 | Date | |
---|---|---|---|
dece | a7d3a18ea7 | ||
dece | e601a77f72 | ||
dece | 255cdcaac2 | ||
dece | 8d9ccd8dc9 | ||
dece | e8a18c6a90 | ||
dece | 0f31807909 | ||
dece | 801471efb2 | ||
dece | 33dab48706 | ||
dece | 95cc864474 | ||
dece | 5caa857df7 | ||
dece | bcdf7db830 | ||
dece | dfed9d5be5 | ||
dece | 1639ffb3ee |
|
@ -65,6 +65,9 @@
|
|||
"sentences": ["you're breathtaking"],
|
||||
"calm_rate": 100
|
||||
},
|
||||
"doupsland": {
|
||||
"commands": ["doupsland"]
|
||||
},
|
||||
"horoscope": {
|
||||
"commands": ["horoscope"],
|
||||
"meditation": "/me looks at the stars",
|
||||
|
@ -144,6 +147,16 @@
|
|||
"positive": ["I like it."],
|
||||
"negative": ["I don't like it."]
|
||||
},
|
||||
"playlistoftheday": {
|
||||
"commands": ["playlist of the day"]
|
||||
},
|
||||
"plus": {
|
||||
"commands": ["plus"],
|
||||
"aliases": {
|
||||
"plus": "more"
|
||||
},
|
||||
"shortcut": "+"
|
||||
},
|
||||
"randomchoice": {
|
||||
"commands": ["choose"],
|
||||
"separator": "or",
|
||||
|
|
112
edmond/plugin.py
112
edmond/plugin.py
|
@ -37,18 +37,30 @@ class Plugin:
|
|||
plugins. It can also save data using the Bot's storage feature to be
|
||||
available after a restart.
|
||||
|
||||
Initalisation should be very fast, no network connections or anything. They
|
||||
are initialised before connecting to the server, so their `is_ready` flag
|
||||
is set at that point. The loading order is more or less random, so a plugin
|
||||
cannot assume another has been loaded during initialisation. If it wants to
|
||||
interact with another plugin, the earliest point to do that is in the
|
||||
on_welcome callback which is called after connecting to a server, and can
|
||||
disable itself by setting its own `is_ready` flag to false.
|
||||
Initialisation should be very fast, no network connections or anything.
|
||||
They are initialised before connecting to the server, so their `is_ready`
|
||||
flag is set at that point. The loading order is more or less random, so a
|
||||
plugin cannot assume another has been loaded during initialisation. If it
|
||||
wants to interact with another plugin, the earliest point to do that is in
|
||||
the on_welcome callback which is called after connecting to a server, and
|
||||
can disable itself by setting its own `is_ready` flag to false.
|
||||
|
||||
A plugin can access its config once the base `__init__` has been called.
|
||||
The configuration is valid only is `is_ready` is True, else accessing its
|
||||
The configuration is valid only if `is_ready` is True, else accessing its
|
||||
content is undefined behaviour.
|
||||
|
||||
Plugins may use two types of special values, stored in two dicts: runtime
|
||||
values and storage values. Runtime values can be accessed by other plugins
|
||||
to get information about the runtime state of the bot. Currently it is used
|
||||
to store if the bot is asleep or awake by the sleep plugin, and its current
|
||||
mood by the mood plugin. They are lost when the bot shuts down. Storage
|
||||
values on the other hand are written in a storage file everytime the object
|
||||
is modified during runtime and also when the bot shuts down. Storage values
|
||||
are stored as JSON, so values must be JSON-encodable. Storage values should
|
||||
be used everytime information needs to persist over restarts instead of
|
||||
external files. Access here means read and write, for both runtime and
|
||||
storage values.
|
||||
|
||||
Plugins can have priorities and calling their callbacks will respect it.
|
||||
For now these levels are used:
|
||||
- 0: default
|
||||
|
@ -108,58 +120,90 @@ class Plugin:
|
|||
return not missing
|
||||
|
||||
def get_runtime_value(
|
||||
self, key: str, default: Any = None, ns: str = None
|
||||
self,
|
||||
key: str,
|
||||
default: Any = None,
|
||||
ns: Optional[str] = None,
|
||||
) -> Any:
|
||||
"""Get a value from the plugin runtime dict.
|
||||
|
||||
This will get the value from the plugin namespace, but it is possible
|
||||
to get runtime values from other plugins using their name as `ns`.
|
||||
"""
|
||||
if ns is None:
|
||||
ns = self.name
|
||||
return self.bot.values[ns].get(key, default)
|
||||
name = ns or self.name
|
||||
return self.bot.values[name].get(key, default)
|
||||
|
||||
def set_runtime_value(self, key: str, value: Any) -> Any:
|
||||
def set_runtime_value(
|
||||
self,
|
||||
key: str,
|
||||
value: Any,
|
||||
ns: Optional[str] = None,
|
||||
) -> None:
|
||||
"""Set a value in the plugin runtime dict."""
|
||||
self.bot.values[self.name][key] = value
|
||||
name = ns or self.name
|
||||
self.bot.values[name][key] = value
|
||||
|
||||
def get_storage_value(self, key: str, default=None, ns: str = None) -> Any:
|
||||
def get_storage_value(
|
||||
self,
|
||||
key: str,
|
||||
default: Any = None,
|
||||
ns: Optional[str] = None,
|
||||
) -> Any:
|
||||
"""Get a value from the plugin persistent storage.
|
||||
|
||||
This will get the value from the plugin namespace, but it is possible
|
||||
to get storage values from other plugins using their name as `ns`.
|
||||
"""
|
||||
if ns is None:
|
||||
ns = self.name
|
||||
return self.bot.storage.get(ns, {}).get(key, default)
|
||||
name = ns or self.name
|
||||
return self.bot.storage.get(name, {}).get(key, default)
|
||||
|
||||
def set_storage_value(self, key: str, value: Any, ns: str = None) -> None:
|
||||
def set_storage_value(
|
||||
self,
|
||||
key: str,
|
||||
value: Any,
|
||||
ns: Optional[str] = None,
|
||||
skip_save: bool = False,
|
||||
) -> None:
|
||||
"""Set a value in the plugin persistent storage."""
|
||||
name = ns or self.name
|
||||
if name not in self.bot.storage:
|
||||
self.bot.storage[name] = {key: value}
|
||||
else:
|
||||
self.bot.storage[name][key] = value
|
||||
self.bot.save_storage()
|
||||
if not skip_save:
|
||||
self.bot.save_storage()
|
||||
|
||||
def append_storage_list_value(self, key: str, value: Any) -> None:
|
||||
def append_storage_list_value(
|
||||
self,
|
||||
key: str,
|
||||
value: Any,
|
||||
ns: str = None,
|
||||
skip_save: bool = False,
|
||||
) -> None:
|
||||
"""Append a value to a list in the plugin persistent storage."""
|
||||
if self.name not in self.bot.storage:
|
||||
self.bot.storage[self.name] = {key: [value]}
|
||||
elif key not in self.bot.storage[self.name]:
|
||||
self.bot.storage[self.name][key] = [value]
|
||||
name = ns or self.name
|
||||
if name not in self.bot.storage:
|
||||
self.bot.storage[name] = {key: [value]}
|
||||
elif key not in self.bot.storage[name]:
|
||||
self.bot.storage[name][key] = [value]
|
||||
else:
|
||||
self.bot.storage[self.name][key].append(value)
|
||||
self.bot.save_storage()
|
||||
self.bot.storage[name][key].append(value)
|
||||
if not skip_save:
|
||||
self.bot.save_storage()
|
||||
|
||||
def remove_storage_list_value(self, key: str, value: Any) -> None:
|
||||
def remove_storage_list_value(
|
||||
self,
|
||||
key: str,
|
||||
value: Any,
|
||||
ns: Optional[str] = None,
|
||||
skip_save: bool = False,
|
||||
) -> None:
|
||||
"""Remove a value from a persistent storage list."""
|
||||
if (
|
||||
self.name in self.bot.storage
|
||||
and key in self.bot.storage[self.name]
|
||||
):
|
||||
self.bot.storage[self.name][key].remove(value)
|
||||
self.bot.save_storage()
|
||||
name = ns or self.name
|
||||
if name in self.bot.storage and key in self.bot.storage[name]:
|
||||
self.bot.storage[name][key].remove(value)
|
||||
if not skip_save:
|
||||
self.bot.save_storage()
|
||||
|
||||
def should_read_message(self, message: str) -> Optional[str]:
|
||||
"""Return a message content if it has been addressed to me, else None.
|
||||
|
|
|
@ -26,9 +26,12 @@ class CapturePlugin(Plugin):
|
|||
return False
|
||||
if self.current_thing is not None:
|
||||
message = event.arguments[0]
|
||||
if message == self.config["capture_sentence"]:
|
||||
capture_sentence = self.config["capture_sentence"]
|
||||
if message == capture_sentence:
|
||||
self.capture(event.source.nick, event.target)
|
||||
return True
|
||||
else:
|
||||
self.bot.log_d(f"Capture: “{message}” != “{capture_sentence}”")
|
||||
return False
|
||||
|
||||
if proc(self.config["rate"]):
|
||||
|
|
64
edmond/plugins/doupsland.py
Normal file
64
edmond/plugins/doupsland.py
Normal file
|
@ -0,0 +1,64 @@
|
|||
import random
|
||||
|
||||
from edmond.plugin import Plugin
|
||||
|
||||
|
||||
P_TIME = [
|
||||
"ce matin", "hier soir", "aujourd'hui", "tout à l'heure", "au réveil", "",
|
||||
]
|
||||
P_ACTION = [
|
||||
"j'ai aperçu", "j'ai caressé", "j'ai nadenade", "j'ai passé du temps avec",
|
||||
"je me suis arrêté vers", "je me suis promené avec", "j'ai salué",
|
||||
"j'ai approché", "j'ai suivi", "je me suis assis devant", "j'ai regardé",
|
||||
"j'ai parlé avec",
|
||||
]
|
||||
P_SUBJ = [
|
||||
"le chat", "le chat calico", "le chat noir", "le chaton",
|
||||
"le chat blanc", "le chat tigré", "le chat gris", "le chat avec le cœur",
|
||||
"le chat errant", "le vieux chat", "le gros chat", "le petit chat",
|
||||
"le chat doux", "le beau chat",
|
||||
]
|
||||
P_PLACE = [
|
||||
"en ville", "au port de pêche", "sur l'île", "sous l'arbre",
|
||||
"au monument de pierre", "sur la plage", "sur le chemin", "dans l'herbe",
|
||||
"devant l'école", "dans la petite cour",
|
||||
"qui miaulait", "qui dormait", "qui était devant la boutique",
|
||||
"qui voulait manger", "qui demandait le nadenade",
|
||||
"sur le quai",
|
||||
]
|
||||
P_COORD = [
|
||||
"et", "et donc", "et puis", "après quoi",
|
||||
]
|
||||
P_ACTION2 = [
|
||||
"il a miaulé", "il s'est endormi", "il m'a remercié",
|
||||
"il est resté avec moi",
|
||||
"il m'a suivi", "il a reclamé un nadenade", "il est monté sur le toît",
|
||||
"il s'est roulé en boule",
|
||||
]
|
||||
|
||||
|
||||
def get_title():
|
||||
return " ".join((
|
||||
random.choice(P_TIME),
|
||||
random.choice(P_ACTION),
|
||||
random.choice(P_SUBJ),
|
||||
random.choice(P_PLACE),
|
||||
random.choice(P_COORD),
|
||||
random.choice(P_ACTION2),
|
||||
)).strip().capitalize()
|
||||
|
||||
|
||||
class DoupslandPlugin(Plugin):
|
||||
|
||||
REQUIRED_CONFIGS = ["commands"]
|
||||
|
||||
def __init__(self, bot):
|
||||
super().__init__(bot)
|
||||
|
||||
def on_pubmsg(self, event):
|
||||
if not self.should_handle_command(event.arguments[0]):
|
||||
return False
|
||||
|
||||
reply = get_title()
|
||||
self.bot.say(event.target, reply)
|
||||
return True
|
66
edmond/plugins/playlist_of_the_day.py
Normal file
66
edmond/plugins/playlist_of_the_day.py
Normal file
|
@ -0,0 +1,66 @@
|
|||
import datetime
|
||||
|
||||
from edmond.plugin import Plugin
|
||||
|
||||
|
||||
class PlaylistOfTheDayPlugin(Plugin):
|
||||
"""Collect music links from other platforms.
|
||||
|
||||
Plugins that want to feed the playlist must do so by calling the add_link method of
|
||||
the plugin. Later this plugin may feed its playlist using links unhandled by other,
|
||||
more specialized plugins (bandcamp, soundcloud, …).
|
||||
|
||||
The current behaviour for the playlist is that links are added for a same day, so
|
||||
the date at which the latest link was added is also stored. If a link is the first
|
||||
to be added on a new day (by using this date as reference), the previous list is
|
||||
emptied. But if the list is requested even if no links have been posted today and
|
||||
the playlist is therefore outdated, it is still posted by the bot, along with a
|
||||
message explaining that it's not the current day's playlist.
|
||||
|
||||
Note that the playlist itself is just of bunch of lines, which should of course
|
||||
contain links, but can also contain titles and other metadata.
|
||||
"""
|
||||
|
||||
REQUIRED_CONFIGS = ["commands"]
|
||||
DATE_KEY = "date_of_latest_link"
|
||||
PLAYLIST_KEY = "current_playlist"
|
||||
|
||||
def __init__(self, bot):
|
||||
super().__init__(bot)
|
||||
|
||||
def on_pubmsg(self, event):
|
||||
if not self.should_handle_command(event.arguments[0], no_content=True):
|
||||
return False
|
||||
|
||||
# self.post_playlist(event.target)
|
||||
return True
|
||||
|
||||
def add_line(self, line: str) -> None:
|
||||
"""Add this line to the current playlist."""
|
||||
today = datetime.date.today()
|
||||
last_update_str = self.get_storage_value(self.DATE_KEY)
|
||||
if last_update_str:
|
||||
last_update_date = datetime.date.fromisoformat(last_update_str)
|
||||
else:
|
||||
last_update_date = today
|
||||
|
||||
if last_update_date == today:
|
||||
current_playlist: list[str] = self.get_storage_value(
|
||||
self.PLAYLIST_KEY,
|
||||
default=[],
|
||||
)
|
||||
current_playlist.append(line)
|
||||
else:
|
||||
current_playlist = [line]
|
||||
self.set_storage_value(
|
||||
self.PLAYLIST_KEY,
|
||||
current_playlist,
|
||||
skip_save=True, # save one write
|
||||
)
|
||||
self.set_storage_value(
|
||||
self.DATE_KEY,
|
||||
today.isoformat(),
|
||||
)
|
||||
|
||||
# def post_playlist(self):
|
||||
# pass
|
41
edmond/plugins/plus.py
Normal file
41
edmond/plugins/plus.py
Normal file
|
@ -0,0 +1,41 @@
|
|||
from edmond.plugin import Plugin
|
||||
|
||||
|
||||
class PlusPlugin(Plugin):
|
||||
"""Plugin to store additional content from other plugins.
|
||||
|
||||
This plugin does not do much on its own, it lets other plugins register
|
||||
additional content to compute on the fly when asked, e.g. the Wikipedia
|
||||
plugin can store the Web page of the definition they just gave so users
|
||||
wanting to know more about a definition can use the "plus" function to get
|
||||
the URL to the Web page.
|
||||
|
||||
There can be only one handle registered at one time by target (so by
|
||||
channel, user, etc). External plugins use the `add_handler` to set the
|
||||
current handler for a target.
|
||||
"""
|
||||
|
||||
REQUIRED_CONFIGS = ["commands"]
|
||||
|
||||
def __init__(self, bot):
|
||||
super().__init__(bot)
|
||||
self.handlers = {}
|
||||
self.shortcut = self.config.get("shortcut")
|
||||
|
||||
def on_pubmsg(self, event):
|
||||
if (
|
||||
(self.shortcut and event.arguments[0].strip() == self.shortcut)
|
||||
or self.should_handle_command(event.arguments[0], no_content=True)
|
||||
):
|
||||
self.process_handler(event)
|
||||
return True
|
||||
return False
|
||||
|
||||
def process_handler(self, event):
|
||||
if handler := self.handlers.pop(event.target, None):
|
||||
handler(event)
|
||||
else:
|
||||
self.signal_failure(event.target)
|
||||
|
||||
def add_handler(self, target: str, handler):
|
||||
self.handlers[target] = handler
|
|
@ -1,5 +1,7 @@
|
|||
import json
|
||||
import socket
|
||||
|
||||
from edmond.bot import Bot
|
||||
from edmond.plugin import Plugin
|
||||
|
||||
|
||||
|
@ -10,7 +12,7 @@ class ShrlokPlugin(Plugin):
|
|||
plugins to share longer texts through a Web portal.
|
||||
"""
|
||||
|
||||
def __init__(self, bot):
|
||||
def __init__(self, bot: Bot):
|
||||
super().__init__(bot)
|
||||
self.socket = ""
|
||||
self.file_root = ""
|
||||
|
@ -29,22 +31,18 @@ class ShrlokPlugin(Plugin):
|
|||
self.bot.log_d("No socket path specified, shrlok plugin disabled.")
|
||||
self.is_ready = False
|
||||
|
||||
def post(self, content_type, content_data):
|
||||
header = '{{"type":"{}"}}'.format(content_type).encode() + b"\0"
|
||||
def post(self, header: dict, data: bytes):
|
||||
encoded_header = json.dumps(header).encode()
|
||||
try:
|
||||
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
|
||||
sock.connect(self.socket)
|
||||
data = header + content_data
|
||||
sock.sendall(str(len(data)).encode() + b"\0" + data)
|
||||
header_and_data = encoded_header + b"\0" + data
|
||||
length = len(header_and_data)
|
||||
packet = str(length).encode() + b"\0" + header_and_data
|
||||
sock.sendall(packet)
|
||||
response = sock.recv(4096)
|
||||
except OSError as exc:
|
||||
self.bot.log_e(f"Can't post data: {exc}")
|
||||
return None
|
||||
url = response.decode().replace(self.file_root, self.url_root, 1)
|
||||
return url
|
||||
|
||||
def post_text(self, text):
|
||||
self.post("txt", text.encode())
|
||||
|
||||
def post_html(self, html):
|
||||
self.post("html", html.encode())
|
||||
|
|
|
@ -1,15 +1,20 @@
|
|||
import random
|
||||
import urllib.parse
|
||||
from typing import cast, Optional
|
||||
|
||||
import requests
|
||||
|
||||
from edmond.plugin import Plugin
|
||||
from edmond.plugins.shrlok import ShrlokPlugin
|
||||
|
||||
BASE_URL = "https://taxref.mnhn.fr/api"
|
||||
IMG_FETCH_HTML = """\
|
||||
<!doctype html>
|
||||
<html>
|
||||
<head><meta charset="UTF-8"/></head>
|
||||
<head>
|
||||
<meta charset="UTF-8"/>
|
||||
<style>img {{ display: block; max-width: 95%; }}</style>
|
||||
</head>
|
||||
<body></body>
|
||||
<script>
|
||||
const urls = [{}];
|
||||
|
@ -52,7 +57,7 @@ class TaxrefPlugin(Plugin):
|
|||
self.find_scientific_name(self.command.content, event.target)
|
||||
return True
|
||||
|
||||
def search_by_name(self, name, target):
|
||||
def search_by_name(self, name: str, target: str) -> None:
|
||||
"""Get species data from a scientific name.
|
||||
|
||||
Try to disambiguate the results by focusing on species only and their
|
||||
|
@ -116,7 +121,7 @@ class TaxrefPlugin(Plugin):
|
|||
if images_reply := self.get_images_reply(item_to_use):
|
||||
self.bot.say(target, images_reply)
|
||||
|
||||
def get_ambiguous_reply(self, items):
|
||||
def get_ambiguous_reply(self, items) -> str:
|
||||
"""Show a reply with potential species."""
|
||||
reply = self.config["ambiguous_reply"]
|
||||
append = ""
|
||||
|
@ -128,7 +133,7 @@ class TaxrefPlugin(Plugin):
|
|||
reply += append
|
||||
return reply
|
||||
|
||||
def get_images_reply(self, item):
|
||||
def get_images_reply(self, item) -> Optional[str]:
|
||||
"""If there are media available, return one in a message.
|
||||
|
||||
If shrlok is available, return a link to an HTML page shared by shrlok.
|
||||
|
@ -145,32 +150,40 @@ class TaxrefPlugin(Plugin):
|
|||
"""
|
||||
m_url = item.get("_links", {}).get("media", {}).get("href")
|
||||
if not m_url:
|
||||
self.bot.log_d("No media links.")
|
||||
return None
|
||||
response = requests.get(m_url)
|
||||
if response.status_code != 200:
|
||||
if (code := response.status_code) != 200:
|
||||
self.bot.log_d(f"Failed to reach media link ({code}).")
|
||||
return None
|
||||
media_data = response.json()
|
||||
items = media_data.get("_embedded", {}).get("media", [])
|
||||
if not items:
|
||||
self.bot.log_d("No media found in response.")
|
||||
return None
|
||||
|
||||
def get_img_url(item):
|
||||
def get_img_url(item) -> Optional[str]:
|
||||
return item.get("_links", {}).get("file", {}).get("href")
|
||||
|
||||
if shrlok := self.bot.get_plugin("shrlok"):
|
||||
if shrlok := cast(ShrlokPlugin, self.bot.get_plugin("shrlok")):
|
||||
if len(items) > 10:
|
||||
items = random.sample(items, 10)
|
||||
urls = map(get_img_url, items)
|
||||
urls_text = ",".join(map(lambda url: f'"{url}"', urls))
|
||||
html = IMG_FETCH_HTML.format(urls_text)
|
||||
link = shrlok.post_html(html)
|
||||
html = IMG_FETCH_HTML.format(urls_text).encode()
|
||||
link = shrlok.post({"type": "raw", "ext": "html"}, html)
|
||||
if not link:
|
||||
self.bot.log_d("shrlok plugin returned an empty string.")
|
||||
else:
|
||||
link = get_img_url(random.choice(items))
|
||||
if not link:
|
||||
self.bot.log_d("No link found.")
|
||||
|
||||
if link:
|
||||
return "📷 " + link
|
||||
return None
|
||||
|
||||
def find_scientific_name(self, name, target):
|
||||
def find_scientific_name(self, name: str, target: str):
|
||||
"""Find a corresponding scientific name for a vernacular name."""
|
||||
name = name.lower()
|
||||
enc_name = urllib.parse.quote(name)
|
||||
|
@ -195,7 +208,7 @@ class TaxrefPlugin(Plugin):
|
|||
else:
|
||||
# More than one result? For simplicity sake, use the shrlok plugin
|
||||
# if available or just show an ambiguous response.
|
||||
if shrlok := self.bot.get_plugin("shrlok"):
|
||||
if shrlok := cast(ShrlokPlugin, self.bot.get_plugin("shrlok")):
|
||||
text = (
|
||||
"\n".join(
|
||||
(
|
||||
|
@ -207,7 +220,7 @@ class TaxrefPlugin(Plugin):
|
|||
)
|
||||
+ "\n"
|
||||
)
|
||||
reply = shrlok.post_text(text)
|
||||
reply = shrlok.post({"type": "txt"}, text.encode())
|
||||
else:
|
||||
reply = self.get_ambiguous_reply(items)
|
||||
|
||||
|
|
|
@ -38,35 +38,49 @@ class WikipediaPlugin(Plugin):
|
|||
return True
|
||||
|
||||
def tell_random_summary(self, event):
|
||||
summary = ""
|
||||
page = None
|
||||
retries = self.NUM_RETRIES
|
||||
while retries > 0:
|
||||
try:
|
||||
summary = wikipedia.summary(wikipedia.random(), sentences=1)
|
||||
page = wikipedia.page(title=wikipedia.random())
|
||||
break
|
||||
except: # The wikipedia package can raise a lot of different stuff.
|
||||
pass
|
||||
except Exception as exc:
|
||||
# The wikipedia package can raise a lot of different stuff,
|
||||
# so we sort of have to catch broadly.
|
||||
self.bot.log_d(f"Wikipedia exception: {exc}")
|
||||
retries -= 1
|
||||
if summary:
|
||||
self.bot.say(event.target, summary)
|
||||
if page:
|
||||
if plus_plugin := self.bot.get_plugin("plus"):
|
||||
def handler(plus_event):
|
||||
self.bot.say(plus_event.target, page.url)
|
||||
plus_plugin.add_handler(event.target, handler)
|
||||
self.bot.say(event.target, page.summary)
|
||||
|
||||
def tell_definition(self, event):
|
||||
summary = ""
|
||||
page = None
|
||||
reply = ""
|
||||
retries = self.NUM_RETRIES
|
||||
while retries > 0:
|
||||
try:
|
||||
summary = wikipedia.summary(self.command.content, sentences=1)
|
||||
page = wikipedia.page(title=self.command.content)
|
||||
break
|
||||
except wikipedia.exceptions.DisambiguationError:
|
||||
summary = self.config["ambiguous_response"]
|
||||
reply = self.config["ambiguous_response"]
|
||||
break
|
||||
except wikipedia.exceptions.PageError:
|
||||
summary = self.config["empty_response"]
|
||||
reply = self.config["empty_response"]
|
||||
break
|
||||
except:
|
||||
summary = self.bot.config["error_message"]
|
||||
reply = self.bot.config["error_message"]
|
||||
# Keep trying after a slight delay.
|
||||
time.sleep(1)
|
||||
retries -= 1
|
||||
if summary:
|
||||
self.bot.say(event.target, summary)
|
||||
if page:
|
||||
reply = page.summary.split(". ", maxsplit=1)[0]
|
||||
if len(reply) > 200:
|
||||
reply = reply[:196] + " […]"
|
||||
if plus_plugin := self.bot.get_plugin("plus"):
|
||||
def handler(plus_event):
|
||||
self.bot.say(plus_event.target, page.url)
|
||||
plus_plugin.add_handler(event.target, handler)
|
||||
self.bot.say(event.target, reply)
|
||||
|
|
|
@ -1,13 +1,16 @@
|
|||
import re
|
||||
from typing import cast, Optional
|
||||
|
||||
try:
|
||||
from googleapiclient.errors import Error as GoogleApiError
|
||||
from googleapiclient.errors import Error as GoogleApiError # type: ignore
|
||||
|
||||
DEPENDENCIES_FOUND = True
|
||||
except ImportError:
|
||||
DEPENDENCIES_FOUND = False
|
||||
|
||||
from edmond.plugin import Plugin
|
||||
from edmond.plugins.playlist_of_the_day import PlaylistOfTheDayPlugin
|
||||
from edmond.plugins.youtube import YoutubePlugin
|
||||
|
||||
|
||||
class YoutubeParserPlugin(Plugin):
|
||||
|
@ -20,14 +23,29 @@ class YoutubeParserPlugin(Plugin):
|
|||
def __init__(self, bot):
|
||||
super().__init__(bot)
|
||||
self.priority = -3
|
||||
self._youtube_plugin = None
|
||||
self._youtube_plugin: Optional[YoutubePlugin] = None
|
||||
self._playlist_of_the_day_plugin: Optional[
|
||||
PlaylistOfTheDayPlugin
|
||||
] = None
|
||||
|
||||
@property
|
||||
def youtube_plugin(self):
|
||||
def youtube_plugin(self) -> Optional[YoutubePlugin]:
|
||||
if self._youtube_plugin is None:
|
||||
self._youtube_plugin = self.bot.get_plugin("youtube")
|
||||
self._youtube_plugin = cast(
|
||||
YoutubePlugin,
|
||||
self.bot.get_plugin("youtube"),
|
||||
)
|
||||
return self._youtube_plugin
|
||||
|
||||
@property
|
||||
def playlist_of_the_day_plugin(self) -> Optional[PlaylistOfTheDayPlugin]:
|
||||
if self._playlist_of_the_day_plugin is None:
|
||||
self._playlist_of_the_day_plugin = cast(
|
||||
PlaylistOfTheDayPlugin,
|
||||
self.bot.get_plugin("playlistoftheday"),
|
||||
)
|
||||
return self._playlist_of_the_day_plugin
|
||||
|
||||
def on_welcome(self, _):
|
||||
if not (self.youtube_plugin and self.youtube_plugin.is_ready):
|
||||
self.bot.log_w("Youtube plugin is not available.")
|
||||
|
@ -40,14 +58,22 @@ class YoutubeParserPlugin(Plugin):
|
|||
for word in words:
|
||||
matched = self.VIDEO_URL_RE.match(word)
|
||||
if matched:
|
||||
return self.handle_match(matched, event.target)
|
||||
title = self.get_video_title(matched)
|
||||
if title:
|
||||
self.bot.say(event.target, title)
|
||||
self.add_to_playlist(word, title)
|
||||
return True
|
||||
else:
|
||||
self.signal_failure(event.target)
|
||||
return False
|
||||
|
||||
def handle_match(self, matched, target):
|
||||
def get_video_title(self, matched) -> Optional[str]:
|
||||
if self.youtube_plugin is None:
|
||||
return None
|
||||
groupdict = matched.groupdict()
|
||||
code = groupdict.get("code1") or groupdict.get("code2")
|
||||
if not code:
|
||||
return False
|
||||
return None
|
||||
try:
|
||||
search_response = (
|
||||
self.youtube_plugin.youtube.videos()
|
||||
|
@ -55,15 +81,17 @@ class YoutubeParserPlugin(Plugin):
|
|||
.execute()
|
||||
)
|
||||
except GoogleApiError:
|
||||
self.signal_failure(target)
|
||||
return False
|
||||
return None
|
||||
title = ""
|
||||
for result in search_response.get("items", []):
|
||||
if result["kind"] == "youtube#video":
|
||||
title = result["snippet"]["title"]
|
||||
break
|
||||
else:
|
||||
self.signal_failure(target)
|
||||
return False
|
||||
self.bot.say(target, title)
|
||||
return True
|
||||
return None
|
||||
return title
|
||||
|
||||
def add_to_playlist(self, url: str, title: str):
|
||||
if self.playlist_of_the_day_plugin is None:
|
||||
return
|
||||
self.playlist_of_the_day_plugin.add_line(f"{url} {title}")
|
||||
|
|
Loading…
Reference in a new issue