This repository has been archived on 2024-08-20. You can view files and clone it, but cannot push or open issues or pull requests.
Bebop/plugins/gopher/bebop_gopher/plugin.py

293 lines
10 KiB
Python
Raw Normal View History

import logging
import re
import socket
from enum import Enum
from typing import Optional
from bebop.browser.browser import Browser
from bebop.command_line import CommandLine
from bebop.downloads import get_download_path
from bebop.links import Links
from bebop.metalines import LineType
from bebop.mime import MimeType
from bebop.navigation import parse_url, parse_host_and_port, unparse_url
from bebop.page import Page
from bebop.plugins import PluginCommand, SchemePlugin
class ItemType(Enum):
FILE = "0"
DIR = "1"
CCSO = "2"
ERROR = "3"
BINHEXED = "4"
DOS = "5"
UUENC = "6"
SEARCH = "7"
TELNET = "8"
BINARY = "9"
REDUNDANT = "+"
TN3270 = "T"
GIF = "g"
IMAGE = "I"
# These are not in the original RFC but encountered frequently.
INFO = "i"
DOC = "d"
HTML = "h"
SOUND = "s"
_missing_ = lambda s: ItemType.FILE
# Types that can be parsed as a page (see `parse_source`).
PARSABLE_TYPES = (ItemType.FILE, ItemType.DIR)
# Types that are not rendered by this plugin; should be handled by a separate
# program, but for now we simply do nothing with them.
UNHANDLED_TYPES = (
ItemType.CCSO, ItemType.ERROR, ItemType.TELNET, ItemType.REDUNDANT,
ItemType.TN3270
)
# Map item types lowercase names to the actual type, to easily set a type from
# the command-line.
USER_FRIENDLY_TYPES = {t.name.lower(): t for t in ItemType}
# Icons to display for some item types in a Gopher map.
ICONS = {
ItemType.FILE: "📄",
ItemType.DIR: "📂",
ItemType.ERROR: "",
ItemType.SEARCH: "",
ItemType.HTML: "🌐",
}
# This regex checks if the URL respects RFC 4266 and has an item type.
TYPE_PATH_RE = re.compile(r"^/([\d\+TgIidhs])(.*)")
class GopherPluginException(Exception):
def __init__(self, message: str) -> None:
super().__init__()
self.message = message
class GopherPlugin(SchemePlugin):
def __init__(self) -> None:
super().__init__("gopher")
self.commands = [
PluginCommand(
"set-item-type",
"display current page as another item type (Gopher only)"
)
]
def open_url(self, browser: Browser, url: str) -> Optional[str]:
"""Request an selector from a Gopher host.
As Bebop works only with URLs and not really the Gopher host/selector
format, we use RFC 4266 (The gopher URI Scheme) for consistency with
other schemes and to get that sweet item type hint in the URL path.
"""
parts = parse_url(url)
host = parts["netloc"]
host_and_port = parse_host_and_port(host, 70)
if host_and_port is None:
browser.set_status_error("Could not parse gopher URL.")
return None
host, port = host_and_port
# Decode path; spaces in Gopher URLs are encoded for display in Bebop.
path = parts["path"].replace("%20", " ")
# If the URL has an item type, use it to properly parse the response.
type_path_match = TYPE_PATH_RE.match(path)
if type_path_match:
item_type = ItemType(type_path_match.group(1))
path = type_path_match.group(2)
# Don't try to open a Telnet connection or other silly retro things.
if item_type in UNHANDLED_TYPES:
browser.set_status_error(f"Unhandled item {item_type.name}.")
return None
# Let user input some text for search items.
if item_type == ItemType.SEARCH:
user_input = browser.get_user_text_input(
"Input:",
CommandLine.CHAR_TEXT,
strip=True
)
if not user_input:
return None
item_type = ItemType.DIR
previous_search_index = path.find("%09")
if previous_search_index > -1:
path = path[:previous_search_index]
path = f"{path}\t{user_input}"
# Note that we don't try to handle "h" items here because if the URL
# actually uses http scheme, it should not end up in this plugin.
else:
item_type = ItemType.DIR
# If we have spaces in our path, encode it for UI & logging.
encoded_path = path.replace(" ", "%20").replace("\t", "%09")
browser.set_status(f"Loading {host} {port} '{encoded_path}'")
timeout = browser.config["connect_timeout"]
try:
response = request(host, port, path, timeout)
except GopherPluginException as exc:
browser.set_status_error("Error: " + exc.message)
return None
url = f"gopher://{host}:{port}/{item_type.value}{encoded_path}"
if item_type in PARSABLE_TYPES:
page = parse_response(response, item_type)
browser.load_page(page)
browser.current_url = url
else:
download_dir = browser.config["download_path"]
filepath = get_download_path(url, download_dir=download_dir)
try:
with open(filepath, "wb") as download_file:
download_file.write(response)
except OSError as exc:
browser.set_status_error(f"Failed to save {url} ({exc})")
return None
else:
browser.set_status(f"Downloaded {url}.")
browser.last_download = None, filepath
return url
def use_command(self, browser: Browser, name: str, text: str):
if name == "set-item-type":
given_type = text[len(name):].strip()
valid_types = [
t for t in USER_FRIENDLY_TYPES
if USER_FRIENDLY_TYPES[t] not in UNHANDLED_TYPES
]
if given_type not in valid_types:
error = "Valid types: " + ", ".join(valid_types)
browser.set_status_error(error)
return
item_type = USER_FRIENDLY_TYPES[given_type]
self.set_item_type(browser, item_type)
def set_item_type(self, browser: Browser, item_type: ItemType):
"""Re-parse the current page using this item type."""
if browser.current_scheme != self.scheme or not browser.current_page:
browser.set_status_error("Can only set item types on Gopher URLs.")
return
logging.debug(f"Force parsing current page as {item_type}")
current_source = browser.current_page.source
new_page = get_page_from_source(current_source, item_type)
browser.load_page(new_page)
# If possible, set the correct item type in the URL path as well.
url = browser.current_url
parts = parse_url(browser.current_url)
type_path_match = TYPE_PATH_RE.match(parts["path"])
if type_path_match:
path = type_path_match.group(2)
parts["path"] = f"/{item_type.value}{path}"
browser.current_url = unparse_url(parts)
def request(host: str, port: int, path: str, timeout: int) -> bytes:
"""Send a Gopher request and return the received bytes."""
try:
sock = socket.create_connection((host, port), timeout=timeout)
except OSError as exc:
raise GopherPluginException("failed to establish connection")
try:
request_str = path.encode() + b"\r\n"
except ValueError as exc:
raise GopherPluginException("could not encode path")
sock.sendall(request_str)
response = b""
while True:
try:
buf = sock.recv(4096)
except socket.timeout:
buf = None
if not buf:
return response
response += buf
return decoded
def parse_response(response: bytes, item_type: ItemType, encoding: str ="utf8"):
"""Parse a Gopher response."""
decoded = response.decode(encoding=encoding, errors="replace")
return get_page_from_source(decoded, item_type)
def get_page_from_source(source: str, item_type: ItemType):
"""Get a Page object from a decoded source text."""
metalines, links = parse_source(source, item_type)
return Page(source, metalines, links)
def parse_source(source: str, item_type: ItemType):
"""Generate metalines and a Links instance for this source text.
The item_type must be a type that can be parsed: FILE or DIR. Any other
item type will silently result in no metalines.
"""
metalines = []
links = Links()
if item_type == ItemType.FILE:
for line in source.split("\n"):
line = line.rstrip("\r")
metalines.append(({"type": LineType.PARAGRAPH}, line))
# Gopher maps are kind of the default here, so it should be quite safe to
# parse any kind of text data.
elif item_type == ItemType.DIR:
current_link_id = 1
# Split lines on \n and discard \r separately because some maps do not
# end lines with \r\n all the time.
for line in source.split("\n"):
line = line.rstrip("\r")
ltype, tline = line[:1], line[1:]
if ltype == "." and not tline:
break
parts = tline.split("\t")
if len(parts) != 4:
# TODO move me away
# Does not seem to be split by tabs, may be a file.
metalines.append(({"type": LineType.PARAGRAPH}, line))
continue
item_type = ItemType(ltype)
label, path, host, port = parts
if item_type == ItemType.INFO:
meta = {"type": LineType.PARAGRAPH}
metalines.append((meta, label))
continue
if item_type == ItemType.HTML and path[:4].upper() == "URL:":
link_url = path[4:]
else:
link_url = f"gopher://{host}:{port}/{ltype}{path}"
meta = {
"type": LineType.LINK,
"url": link_url,
"link": current_link_id
}
links[current_link_id] = link_url
icon = ICONS.get(item_type) or f"({ltype})"
text = f"[{current_link_id}] {icon} {label}"
metalines.append((meta, text))
current_link_id += 1
return metalines, links
plugin = GopherPlugin()