You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

195 lines
7.2 KiB

import importlib
import json
import os
import time
import signal
from pathlib import Path
import irc.client # type: ignore
from irc.client import NickMask
from edmond.log import Logger
class Bot(irc.client.SimpleIRCClient, Logger):
CHANNELS_RUNTIME_KEY = "_channels"
def __init__(self, config, logger):
super().__init__()
self.config = config
self.logger = logger
self.plugins = []
self.values = {}
self.storage = self.__get_storage()
self.done = False
@property
def nick(self):
"""Nickname validated by the server, or the configured nick."""
if self.connection.is_connected():
return self.connection.get_nickname()
return self.config["nick"]
@property
def names(self):
"""Collection of names the bot should identify with."""
return (self.nick, *self.config["alternative_nicks"])
@property
def channels(self):
"""List of joined channels."""
if self.CHANNELS_RUNTIME_KEY not in self.values:
self.values[self.CHANNELS_RUNTIME_KEY] = []
return self.values[self.CHANNELS_RUNTIME_KEY]
def __get_storage(self):
"""Load data from storage."""
try:
with open(self.config["storage_file"], "rt") as storage_file:
storage = json.load(storage_file)
self.log_d("Loaded storage file.")
return storage
except (OSError, json.decoder.JSONDecodeError) as exc:
self.log_e(f"Could not load storage file: {exc}")
self.log_w(
"If it's not the first time Edm0nd is run, you may lose"
" data when closing the program."
)
return {}
def __save_storage(self):
"""Save storage data to disk."""
try:
with open(self.config["storage_file"], "wt") as storage_file:
json.dump(self.storage, storage_file, indent=2, sort_keys=True)
self.log_d("Saved storage file.")
except OSError as exc:
self.log_e(f"Could not save storage file: {exc}")
def on_welcome(self, connection, event):
"""Handle a successful connection to a server."""
self.log_i(f"Connected to server {event.source}.")
self.run_plugin_callbacks(event)
for channel in self.config["channels"]:
connection.join(channel)
def on_join(self, connection, event):
"""Handle someone, possibly the bot, joining a channel."""
if event.source.nick == self.nick:
self.log_i(f"Joined {event.target}.")
self.channels.append(event.target)
self.run_plugin_callbacks(event)
def on_part(self, connection, event):
"""Handle someone, possibly the bot, leaving a channel."""
if event.source.nick == self.nick:
self.log_i(f"Left {event.target} (args: {event.arguments[0]}).")
self.channels.remove(event.target)
self.run_plugin_callbacks(event)
def on_pubmsg(self, connection, event):
"""Handle a message received in a channel."""
channel = event.target
nick = NickMask(event.source).nick
message = event.arguments[0]
self.log_d(f"Message in {channel} from {nick}: {message}")
self.run_plugin_callbacks(event)
def on_privmsg(self, connection, event):
"""Handle a message received privately, usually like a channel msg."""
nick = NickMask(event.source).nick
target = event.target
message = event.arguments[0]
self.log_d(f"Private message from {nick} to {target}: {message}")
self.run_plugin_callbacks(event)
def on_ping(self, connection, event):
"""Handle a ping; can be used as a random event timer."""
self.log_d(f"Received ping from {event.target}.")
self.run_plugin_callbacks(event)
def run(self):
"""Connect the bot to server, join channels and start responding."""
self.log_i("Starting Edmond.")
self.load_plugins()
self.log_i("Connecting to server.")
self.connect(self.config["host"], self.config["port"], self.nick)
signal.signal(signal.SIGTERM, self.handle_sigterm)
try:
self.start()
except KeyboardInterrupt:
self.log_i("Caught keyboard interrupt.")
except Exception as exc: # Yes, I know, who are you going to call?
self.log_c(f"Caught unhandled exception: {exc}")
finally:
self.cleanup()
def load_plugins(self):
"""Load all installed plugins."""
self.log_i("Loading plugins.")
plugin_files = os.listdir(Path(__file__).parent / "plugins")
plugin_names = map(
lambda f: os.path.splitext(f)[0],
filter(lambda f: f.endswith(".py"), plugin_files),
)
for plugin_name in plugin_names:
module = importlib.import_module(f"edmond.plugins.{plugin_name}")
are_dependencies_ok = getattr(module, "DEPENDENCIES_FOUND", True)
if not are_dependencies_ok:
self.log_e(f"Dependencies not found for plugin {plugin_name}.")
continue
# Get plugin class name from its module name.
class_name = (
"".join(map(lambda w: w.capitalize(), plugin_name.split("_")))
+ "Plugin"
)
plugin_class = getattr(module, class_name)
self.plugins.append(plugin_class(self))
self.values[plugin_name] = {}
self.log_d(f"Loaded {class_name}.")
def get_plugin(self, name):
"""Get a loaded plugin by its name (e.g. 'mood'), or None."""
return next(filter(lambda p: p.name == name, self.plugins), None)
def say(self, target, message):
"""Send message to target after a slight delay."""
message = message.replace("\n", " ").replace("\r", " ")
time.sleep(self.config["speak_delay"])
self.log_d(f"Sending to {target}: {message}")
try:
if message.startswith("/me "):
self.connection.action(target, message[4:])
else:
self.connection.privmsg(target, message)
except irc.client.MessageTooLong:
self.log_e("Could not send, message is too long.")
def run_plugin_callbacks(self, event):
"""Run appropriate callbacks for each plugin."""
etype = event.type
plugins = filter(lambda p: p.is_ready, self.plugins)
plugins = sorted(plugins, key=lambda p: p.priority, reverse=True)
for plugin in plugins:
callbacks = plugin.callbacks
if etype not in callbacks:
continue
if callbacks[etype](event):
break
def handle_sigterm(self, *args):
"""Handle SIGTERM (keyboard interrupt, systemd stop, etc)."""
self.cleanup()
exit("Exiting after received SIGTERM.")
def cleanup(self):
"""Save the storage file and close the connection. Run only once."""
if self.done:
return
self.log_i("Stopping Edmond.")
self.__save_storage()
if self.connection.is_connected():
self.connection.close()
self.done = True