Compare commits

..

10 commits

4 changed files with 40 additions and 181 deletions

View file

@ -147,9 +147,6 @@
"positive": ["I like it."],
"negative": ["I don't like it."]
},
"playlistoftheday": {
"commands": ["playlist of the day"]
},
"plus": {
"commands": ["plus"],
"aliases": {

View file

@ -46,21 +46,9 @@ class Plugin:
can disable itself by setting its own `is_ready` flag to false.
A plugin can access its config once the base `__init__` has been called.
The configuration is valid only if `is_ready` is True, else accessing its
The configuration is valid only is `is_ready` is True, else accessing its
content is undefined behaviour.
Plugins may use two types of special values, stored in two dicts: runtime
values and storage values. Runtime values can be accessed by other plugins
to get information about the runtime state of the bot. Currently it is used
to store if the bot is asleep or awake by the sleep plugin, and its current
mood by the mood plugin. They are lost when the bot shuts down. Storage
values on the other hand are written in a storage file everytime the object
is modified during runtime and also when the bot shuts down. Storage values
are stored as JSON, so values must be JSON-encodable. Storage values should
be used everytime information needs to persist over restarts instead of
external files. Access here means read and write, for both runtime and
storage values.
Plugins can have priorities and calling their callbacks will respect it.
For now these levels are used:
- 0: default
@ -120,90 +108,58 @@ class Plugin:
return not missing
def get_runtime_value(
self,
key: str,
default: Any = None,
ns: Optional[str] = None,
self, key: str, default: Any = None, ns: str = None
) -> Any:
"""Get a value from the plugin runtime dict.
This will get the value from the plugin namespace, but it is possible
to get runtime values from other plugins using their name as `ns`.
"""
name = ns or self.name
return self.bot.values[name].get(key, default)
if ns is None:
ns = self.name
return self.bot.values[ns].get(key, default)
def set_runtime_value(
self,
key: str,
value: Any,
ns: Optional[str] = None,
) -> None:
def set_runtime_value(self, key: str, value: Any) -> Any:
"""Set a value in the plugin runtime dict."""
name = ns or self.name
self.bot.values[name][key] = value
self.bot.values[self.name][key] = value
def get_storage_value(
self,
key: str,
default: Any = None,
ns: Optional[str] = None,
) -> Any:
def get_storage_value(self, key: str, default=None, ns: str = None) -> Any:
"""Get a value from the plugin persistent storage.
This will get the value from the plugin namespace, but it is possible
to get storage values from other plugins using their name as `ns`.
"""
name = ns or self.name
return self.bot.storage.get(name, {}).get(key, default)
if ns is None:
ns = self.name
return self.bot.storage.get(ns, {}).get(key, default)
def set_storage_value(
self,
key: str,
value: Any,
ns: Optional[str] = None,
skip_save: bool = False,
) -> None:
def set_storage_value(self, key: str, value: Any, ns: str = None) -> None:
"""Set a value in the plugin persistent storage."""
name = ns or self.name
if name not in self.bot.storage:
self.bot.storage[name] = {key: value}
else:
self.bot.storage[name][key] = value
if not skip_save:
self.bot.save_storage()
self.bot.save_storage()
def append_storage_list_value(
self,
key: str,
value: Any,
ns: str = None,
skip_save: bool = False,
) -> None:
def append_storage_list_value(self, key: str, value: Any) -> None:
"""Append a value to a list in the plugin persistent storage."""
name = ns or self.name
if name not in self.bot.storage:
self.bot.storage[name] = {key: [value]}
elif key not in self.bot.storage[name]:
self.bot.storage[name][key] = [value]
if self.name not in self.bot.storage:
self.bot.storage[self.name] = {key: [value]}
elif key not in self.bot.storage[self.name]:
self.bot.storage[self.name][key] = [value]
else:
self.bot.storage[name][key].append(value)
if not skip_save:
self.bot.save_storage()
self.bot.storage[self.name][key].append(value)
self.bot.save_storage()
def remove_storage_list_value(
self,
key: str,
value: Any,
ns: Optional[str] = None,
skip_save: bool = False,
) -> None:
def remove_storage_list_value(self, key: str, value: Any) -> None:
"""Remove a value from a persistent storage list."""
name = ns or self.name
if name in self.bot.storage and key in self.bot.storage[name]:
self.bot.storage[name][key].remove(value)
if not skip_save:
self.bot.save_storage()
if (
self.name in self.bot.storage
and key in self.bot.storage[self.name]
):
self.bot.storage[self.name][key].remove(value)
self.bot.save_storage()
def should_read_message(self, message: str) -> Optional[str]:
"""Return a message content if it has been addressed to me, else None.

View file

@ -1,66 +0,0 @@
import datetime
from edmond.plugin import Plugin
class PlaylistOfTheDayPlugin(Plugin):
"""Collect music links from other platforms.
Plugins that want to feed the playlist must do so by calling the add_link method of
the plugin. Later this plugin may feed its playlist using links unhandled by other,
more specialized plugins (bandcamp, soundcloud, ).
The current behaviour for the playlist is that links are added for a same day, so
the date at which the latest link was added is also stored. If a link is the first
to be added on a new day (by using this date as reference), the previous list is
emptied. But if the list is requested even if no links have been posted today and
the playlist is therefore outdated, it is still posted by the bot, along with a
message explaining that it's not the current day's playlist.
Note that the playlist itself is just of bunch of lines, which should of course
contain links, but can also contain titles and other metadata.
"""
REQUIRED_CONFIGS = ["commands"]
DATE_KEY = "date_of_latest_link"
PLAYLIST_KEY = "current_playlist"
def __init__(self, bot):
super().__init__(bot)
def on_pubmsg(self, event):
if not self.should_handle_command(event.arguments[0], no_content=True):
return False
# self.post_playlist(event.target)
return True
def add_line(self, line: str) -> None:
"""Add this line to the current playlist."""
today = datetime.date.today()
last_update_str = self.get_storage_value(self.DATE_KEY)
if last_update_str:
last_update_date = datetime.date.fromisoformat(last_update_str)
else:
last_update_date = today
if last_update_date == today:
current_playlist: list[str] = self.get_storage_value(
self.PLAYLIST_KEY,
default=[],
)
current_playlist.append(line)
else:
current_playlist = [line]
self.set_storage_value(
self.PLAYLIST_KEY,
current_playlist,
skip_save=True, # save one write
)
self.set_storage_value(
self.DATE_KEY,
today.isoformat(),
)
# def post_playlist(self):
# pass

View file

@ -1,16 +1,13 @@
import re
from typing import cast, Optional
try:
from googleapiclient.errors import Error as GoogleApiError # type: ignore
from googleapiclient.errors import Error as GoogleApiError
DEPENDENCIES_FOUND = True
except ImportError:
DEPENDENCIES_FOUND = False
from edmond.plugin import Plugin
from edmond.plugins.playlist_of_the_day import PlaylistOfTheDayPlugin
from edmond.plugins.youtube import YoutubePlugin
class YoutubeParserPlugin(Plugin):
@ -23,29 +20,14 @@ class YoutubeParserPlugin(Plugin):
def __init__(self, bot):
super().__init__(bot)
self.priority = -3
self._youtube_plugin: Optional[YoutubePlugin] = None
self._playlist_of_the_day_plugin: Optional[
PlaylistOfTheDayPlugin
] = None
self._youtube_plugin = None
@property
def youtube_plugin(self) -> Optional[YoutubePlugin]:
def youtube_plugin(self):
if self._youtube_plugin is None:
self._youtube_plugin = cast(
YoutubePlugin,
self.bot.get_plugin("youtube"),
)
self._youtube_plugin = self.bot.get_plugin("youtube")
return self._youtube_plugin
@property
def playlist_of_the_day_plugin(self) -> Optional[PlaylistOfTheDayPlugin]:
if self._playlist_of_the_day_plugin is None:
self._playlist_of_the_day_plugin = cast(
PlaylistOfTheDayPlugin,
self.bot.get_plugin("playlistoftheday"),
)
return self._playlist_of_the_day_plugin
def on_welcome(self, _):
if not (self.youtube_plugin and self.youtube_plugin.is_ready):
self.bot.log_w("Youtube plugin is not available.")
@ -58,22 +40,14 @@ class YoutubeParserPlugin(Plugin):
for word in words:
matched = self.VIDEO_URL_RE.match(word)
if matched:
title = self.get_video_title(matched)
if title:
self.bot.say(event.target, title)
self.add_to_playlist(word, title)
return True
else:
self.signal_failure(event.target)
return self.handle_match(matched, event.target)
return False
def get_video_title(self, matched) -> Optional[str]:
if self.youtube_plugin is None:
return None
def handle_match(self, matched, target):
groupdict = matched.groupdict()
code = groupdict.get("code1") or groupdict.get("code2")
if not code:
return None
return False
try:
search_response = (
self.youtube_plugin.youtube.videos()
@ -81,17 +55,15 @@ class YoutubeParserPlugin(Plugin):
.execute()
)
except GoogleApiError:
return None
self.signal_failure(target)
return False
title = ""
for result in search_response.get("items", []):
if result["kind"] == "youtube#video":
title = result["snippet"]["title"]
break
else:
return None
return title
def add_to_playlist(self, url: str, title: str):
if self.playlist_of_the_day_plugin is None:
return
self.playlist_of_the_day_plugin.add_line(f"{url} {title}")
self.signal_failure(target)
return False
self.bot.say(target, title)
return True