Edm0nd/edmond/bot.py

193 lines
7.1 KiB
Python

import importlib
import json
import os
import time
import signal
from pathlib import Path
import irc.client
from irc.client import NickMask
from edmond.log import Logger
class Bot(irc.client.SimpleIRCClient, Logger):
CHANNELS_RUNTIME_KEY = "_channels"
def __init__(self, config, logger):
super().__init__()
self.config = config
self.logger = logger
self.plugins = []
self.values = {}
self.storage = self.__get_storage()
self.done = False
@property
def nick(self):
"""Nickname validated by the server, or the configured nick."""
if self.connection.is_connected():
return self.connection.get_nickname()
return self.config["nick"]
@property
def names(self):
"""Collection of names the bot should identify with."""
return (self.nick, *self.config["alternative_nicks"])
@property
def channels(self):
"""List of joined channels."""
if self.CHANNELS_RUNTIME_KEY not in self.values:
self.values[self.CHANNELS_RUNTIME_KEY] = []
return self.values[self.CHANNELS_RUNTIME_KEY]
def __get_storage(self):
"""Load data from storage."""
try:
with open(self.config["storage_file"], "rt") as storage_file:
storage = json.load(storage_file)
self.log_d("Loaded storage file.")
return storage
except (OSError, json.decoder.JSONDecodeError) as exc:
self.log_e(f"Could not load storage file: {exc}")
self.log_w("If it's not the first time Edm0nd is run, you may lose"
" data when closing the program.")
return {}
def __save_storage(self):
"""Save storage data to disk."""
try:
with open(self.config["storage_file"], "wt") as storage_file:
json.dump(self.storage, storage_file, indent=2, sort_keys=True)
self.log_d("Saved storage file.")
except (OSError, json.decoder.JSONEncodeError) as exc:
self.log_e(f"Could not save storage file: {exc}")
def on_welcome(self, connection, event):
"""Handle a successful connection to a server."""
self.log_i(f"Connected to server {event.source}.")
self.run_plugin_callbacks(event)
for channel in self.config["channels"]:
connection.join(channel)
def on_join(self, connection, event):
"""Handle someone, possibly the bot, joining a channel."""
if event.source.nick == self.nick:
self.log_i(f"Joined {event.target}.")
self.channels.append(event.target)
self.run_plugin_callbacks(event)
def on_part(self, connection, event):
"""Handle someone, possibly the bot, leaving a channel."""
if event.source.nick == self.nick:
self.log_i(f"Left {event.target} (args: {event.arguments[0]}).")
self.channels.remove(event.target)
self.run_plugin_callbacks(event)
def on_pubmsg(self, connection, event):
"""Handle a message received in a channel."""
channel = event.target
nick = NickMask(event.source).nick
message = event.arguments[0]
self.log_d(f"Message in {channel} from {nick}: {message}")
self.run_plugin_callbacks(event)
def on_privmsg(self, connection, event):
"""Handle a message received privately."""
nick = NickMask(event.source).nick
target = event.target
message = event.arguments[0]
self.log_d(f"Private message from {nick} to {target}: {message}")
self.run_plugin_callbacks(event)
def on_ping(self, connection, event):
"""Handle a ping; can be used as a random event timer."""
self.log_d(f"Received ping from {event.target}.")
self.run_plugin_callbacks(event)
def run(self):
"""Connect the bot to server, join channels and start responding."""
self.log_i("Starting Edmond.")
self.load_plugins()
self.log_i("Connecting to server.")
self.connect(self.config["host"], self.config["port"], self.nick)
signal.signal(signal.SIGTERM, self.handle_sigterm)
try:
self.start()
except KeyboardInterrupt:
self.log_i("Caught keyboard interrupt.")
except Exception as exc: # Yes, I know, who are you going to call?
self.log_c(f"Caught unhandled exception: {exc}")
finally:
self.cleanup()
def load_plugins(self):
"""Load all installed plugins."""
self.log_i("Loading plugins.")
plugin_files = os.listdir(Path(__file__).parent / "plugins")
plugin_names = map(
lambda f: os.path.splitext(f)[0],
filter(lambda f: f.endswith(".py"), plugin_files)
)
for plugin_name in plugin_names:
module = importlib.import_module(f"edmond.plugins.{plugin_name}")
are_dependencies_ok = getattr(module, "DEPENDENCIES_FOUND", True)
if not are_dependencies_ok:
self.log_e(f"Dependencies not found for plugin {plugin_name}.")
continue
# Get plugin class name from its module name.
class_name = "".join(map(
lambda w: w.capitalize(),
plugin_name.split("_")
)) + "Plugin"
plugin_class = getattr(module, class_name)
self.plugins.append(plugin_class(self))
self.values[plugin_name] = {}
self.log_d(f"Loaded {class_name}.")
def get_plugin(self, name):
"""Get a loaded plugin by its name (e.g. 'mood'), or None."""
return next(filter(lambda p: p.name == name, self.plugins), None)
def say(self, target, message):
"""Send message to target after a slight delay."""
message = message.replace("\n", " ").replace("\r", " ")
time.sleep(self.config["speak_delay"])
self.log_d(f"Sending to {target}: {message}")
try:
if message.startswith("/me "):
self.connection.action(target, message[4:])
else:
self.connection.privmsg(target, message)
except irc.client.MessageTooLong:
self.log_e("Could not send, message is too long.")
def run_plugin_callbacks(self, event):
"""Run appropriate callbacks for each plugin."""
etype = event.type
plugins = filter(lambda p: p.is_ready, self.plugins)
plugins = sorted(plugins, key=lambda p: p.priority, reverse=True)
for plugin in plugins:
callbacks = plugin.callbacks
if etype not in callbacks:
continue
if callbacks[etype](event):
break
def handle_sigterm(self, *args):
"""Handle SIGTERM (keyboard interrupt, systemd stop, etc)."""
self.cleanup()
exit("Exiting after received SIGTERM.")
def cleanup(self):
"""Save the storage file and close the connection. Run only once."""
if self.done:
return
self.log_i("Stopping Edmond.")
self.__save_storage()
if self.connection.is_connected():
self.connection.close()
self.done = True