dece
4480a3b44a
When a plugin callbacks successfully handles an event it returns True, and it now stops processing callbacks. It may be changed later to handle multiple callbacks per event.
160 lines
5.8 KiB
Python
160 lines
5.8 KiB
Python
import importlib
|
|
import json
|
|
import os
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import irc.client
|
|
from irc.client import NickMask
|
|
|
|
from edmond.log import Logger
|
|
|
|
|
|
class Bot(irc.client.SimpleIRCClient, Logger):
|
|
|
|
CHANNELS_RUNTIME_KEY = "_channels"
|
|
|
|
def __init__(self, config, logger):
|
|
super().__init__()
|
|
self.config = config
|
|
self.logger = logger
|
|
self.plugins = []
|
|
self.values = {}
|
|
self.storage = self._get_storage()
|
|
|
|
@property
|
|
def nick(self):
|
|
"""Nickname validated by the server, or the configured nick."""
|
|
if self.connection.is_connected():
|
|
return self.connection.get_nickname()
|
|
return self.config["nick"]
|
|
|
|
@property
|
|
def names(self):
|
|
"""Collection of names the bot should identify with."""
|
|
return (self.nick, *self.config["alternative_nicks"])
|
|
|
|
@property
|
|
def channels(self):
|
|
"""List of joined channels."""
|
|
if self.CHANNELS_RUNTIME_KEY not in self.values:
|
|
self.values[self.CHANNELS_RUNTIME_KEY] = []
|
|
return self.values[self.CHANNELS_RUNTIME_KEY]
|
|
|
|
def _get_storage(self):
|
|
"""Load data from storage."""
|
|
try:
|
|
with open(self.config["storage_file"], "rt") as storage_file:
|
|
storage = json.load(storage_file)
|
|
self.log_d("Loaded storage file.")
|
|
return storage
|
|
except (OSError, json.decoder.JSONDecodeError) as exc:
|
|
self.log_e(f"Could not load storage file: {exc}")
|
|
self.log_w("If it's not the first time Edm0nd is run, you may lose"
|
|
" data when closing the program.")
|
|
return {}
|
|
|
|
def _save_storage(self):
|
|
"""Save storage data to disk."""
|
|
try:
|
|
with open(self.config["storage_file"], "wt") as storage_file:
|
|
json.dump(self.storage, storage_file, indent=2, sort_keys=True)
|
|
self.log_d("Saved storage file.")
|
|
except (OSError, json.decoder.JSONEncodeError) as exc:
|
|
self.log_e(f"Could not save storage file: {exc}")
|
|
|
|
def on_welcome(self, connection, event):
|
|
"""Handle a successful connection to a server."""
|
|
self.log_i(f"Connected to server {event.source}.")
|
|
self.run_plugin_callbacks(event)
|
|
for channel in self.config["channels"]:
|
|
connection.join(channel)
|
|
|
|
def on_join(self, connection, event):
|
|
"""Handle someone, possibly the bot, joining a channel."""
|
|
if event.source.nick == self.nick:
|
|
self.log_i(f"Joined {event.target}.")
|
|
self.channels.append(event.target)
|
|
self.run_plugin_callbacks(event)
|
|
|
|
def on_part(self, connection, event):
|
|
"""Handle someone, possibly the bot, leaving a channel."""
|
|
if event.source.nick == self.nick:
|
|
self.log_i(f"Left {event.target} (args: {event.arguments[0]}).")
|
|
self.channels.remove(event.target)
|
|
self.run_plugin_callbacks(event)
|
|
|
|
def on_pubmsg(self, connection, event):
|
|
"""Handle a message received in a channel."""
|
|
channel = event.target
|
|
nick = NickMask(event.source).nick
|
|
message = event.arguments[0]
|
|
self.log_d(f"Message in {channel} from {nick}: {message}")
|
|
self.run_plugin_callbacks(event)
|
|
|
|
def on_privmsg(self, connection, event):
|
|
"""Handle a message received privately."""
|
|
nick = NickMask(event.source).nick
|
|
target = event.target
|
|
message = event.arguments[0]
|
|
self.log_d(f"Private message from {nick} to {target}: {message}")
|
|
self.run_plugin_callbacks(event)
|
|
|
|
def on_ping(self, connection, event):
|
|
"""Handle a ping; can be used as a random event timer."""
|
|
self.log_d(f"Received ping from {event.target}.")
|
|
self.run_plugin_callbacks(event)
|
|
|
|
def run(self):
|
|
"""Connect the bot to server, join channels and start responding."""
|
|
self.log_i("Starting Edmond.")
|
|
self.load_plugins()
|
|
self.connect(self.config["host"], self.config["port"], self.nick)
|
|
try:
|
|
self.start()
|
|
except KeyboardInterrupt:
|
|
self.log_i("Stopping Edmond.")
|
|
self._save_storage()
|
|
|
|
def load_plugins(self):
|
|
"""Load all installed plugins."""
|
|
plugin_files = os.listdir(Path(__file__).parent / "plugins")
|
|
plugin_names = map(
|
|
lambda f: os.path.splitext(f)[0],
|
|
filter(lambda f: f.endswith(".py"), plugin_files)
|
|
)
|
|
for plugin_name in plugin_names:
|
|
module = importlib.import_module(f"edmond.plugins.{plugin_name}")
|
|
are_dependencies_ok = getattr(module, "DEPENDENCIES_FOUND", True)
|
|
if not are_dependencies_ok:
|
|
self.log_e(f"Dependencies not found for plugin {plugin_name}.")
|
|
continue
|
|
# Get plugin class name from its module name.
|
|
class_name = "".join(map(
|
|
lambda w: w.capitalize(),
|
|
plugin_name.split("_")
|
|
)) + "Plugin"
|
|
plugin_class = getattr(module, class_name)
|
|
self.plugins.append(plugin_class(self))
|
|
self.values[plugin_name] = {}
|
|
self.log_d(f"Loaded {class_name}.")
|
|
|
|
def say(self, target, message):
|
|
"""Send message to target after a slight delay."""
|
|
time.sleep(self.config["speak_delay"])
|
|
self.log_d(f"Sending to {target}: {message}")
|
|
if message.startswith("/me "):
|
|
self.connection.action(target, message[4:])
|
|
else:
|
|
self.connection.privmsg(target, message)
|
|
|
|
def run_plugin_callbacks(self, event):
|
|
"""Run appropriate callbacks for each plugin."""
|
|
etype = event.type
|
|
for plugin in filter(lambda p: p.is_ready, self.plugins):
|
|
callbacks = plugin.callbacks
|
|
if etype not in callbacks:
|
|
continue
|
|
if callbacks[etype](event):
|
|
break
|