2021-06-05 21:48:42 +02:00
|
|
|
import logging
|
|
|
|
import re
|
|
|
|
import socket
|
|
|
|
from enum import Enum
|
|
|
|
from typing import Optional
|
|
|
|
|
|
|
|
from bebop.browser.browser import Browser
|
2021-06-06 03:02:32 +02:00
|
|
|
from bebop.command_line import CommandLine
|
2021-06-05 21:48:42 +02:00
|
|
|
from bebop.links import Links
|
|
|
|
from bebop.metalines import LineType
|
|
|
|
from bebop.navigation import parse_url, parse_host_and_port
|
|
|
|
from bebop.page import Page
|
|
|
|
from bebop.plugins import SchemePlugin
|
|
|
|
|
|
|
|
|
|
|
|
class ItemType(Enum):
|
|
|
|
FILE = "0"
|
|
|
|
DIR = "1"
|
2021-06-06 03:02:32 +02:00
|
|
|
CCSO = "2"
|
2021-06-05 21:48:42 +02:00
|
|
|
ERROR = "3"
|
|
|
|
BINHEXED = "4"
|
|
|
|
DOS = "5"
|
|
|
|
UUENC = "6"
|
|
|
|
SEARCH = "7"
|
|
|
|
TELNET = "8"
|
|
|
|
BINARY = "9"
|
|
|
|
REDUNDANT = "+"
|
|
|
|
TN3270 = "T"
|
|
|
|
GIF = "g"
|
|
|
|
IMAGE = "I"
|
|
|
|
# These are not in the original RFC but encountered frequently.
|
|
|
|
INFO = "i"
|
|
|
|
DOC = "d"
|
|
|
|
HTML = "h"
|
|
|
|
SOUND = "s"
|
|
|
|
_missing_ = lambda s: ItemType.FILE
|
|
|
|
|
|
|
|
|
2021-06-06 03:02:32 +02:00
|
|
|
UNHANDLED_TYPES = (
|
|
|
|
ItemType.CCSO, ItemType.ERROR, ItemType.TELNET, ItemType.REDUNDANT,
|
|
|
|
ItemType.TN3270
|
|
|
|
)
|
2021-06-05 21:48:42 +02:00
|
|
|
ICONS = {
|
|
|
|
ItemType.FILE: "📄",
|
|
|
|
ItemType.DIR: "📂",
|
|
|
|
ItemType.ERROR: "❌",
|
2021-06-06 03:02:32 +02:00
|
|
|
ItemType.SEARCH: "🤔",
|
2021-06-05 21:48:42 +02:00
|
|
|
ItemType.HTML: "🌐",
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
# This regex checks if the URL respects RFC 4266 and has an item type.
|
|
|
|
TYPE_PATH_RE = re.compile(r"^/([\d\+TgIidhs])(.*)")
|
|
|
|
|
|
|
|
|
|
|
|
class GopherPluginException(Exception):
|
|
|
|
|
|
|
|
def __init__(self, message: str) -> None:
|
|
|
|
super().__init__()
|
|
|
|
self.message = message
|
|
|
|
|
|
|
|
|
|
|
|
class GopherPlugin(SchemePlugin):
|
|
|
|
|
|
|
|
def __init__(self) -> None:
|
|
|
|
super().__init__("gopher")
|
|
|
|
|
|
|
|
def open_url(self, browser: Browser, url: str) -> Optional[str]:
|
|
|
|
parts = parse_url(url)
|
|
|
|
host = parts["netloc"]
|
|
|
|
host_and_port = parse_host_and_port(host, 70)
|
|
|
|
if host_and_port is None:
|
|
|
|
browser.set_status_error("Could not parse gopher URL.")
|
|
|
|
return None
|
|
|
|
host, port = host_and_port
|
|
|
|
path = parts["path"]
|
|
|
|
|
|
|
|
# If the URL has an item type, use it to properly parse the response.
|
|
|
|
type_path_match = TYPE_PATH_RE.match(path)
|
|
|
|
if type_path_match:
|
|
|
|
item_type = ItemType(type_path_match.group(1))
|
|
|
|
path = type_path_match.group(2)
|
2021-06-06 03:02:32 +02:00
|
|
|
# Don't try to open a Telnet connection or other silly retro things.
|
|
|
|
if item_type in UNHANDLED_TYPES:
|
|
|
|
browser.set_status_error(f"Unhandled item {item_type.name}.")
|
|
|
|
return None
|
|
|
|
# Let user input some text for search items.
|
|
|
|
if item_type == ItemType.SEARCH:
|
|
|
|
user_input = browser.get_user_text_input(
|
|
|
|
"Input:",
|
|
|
|
CommandLine.CHAR_TEXT,
|
|
|
|
strip=True
|
|
|
|
)
|
|
|
|
if not user_input:
|
|
|
|
return None
|
|
|
|
item_type = ItemType.DIR
|
|
|
|
previous_search_index = path.find("%09")
|
|
|
|
if previous_search_index > -1:
|
|
|
|
path = path[:previous_search_index]
|
|
|
|
path = f"{path}\t{user_input}"
|
|
|
|
# Note that we don't try to handle "h" items here because if the URL
|
|
|
|
# actually uses http scheme, it should not end up in this plugin.
|
2021-06-05 21:48:42 +02:00
|
|
|
else:
|
|
|
|
item_type = ItemType.DIR
|
2021-06-06 03:02:32 +02:00
|
|
|
|
|
|
|
# If we have text search in our path, encode it for UI & logging.
|
|
|
|
encoded_path = path.replace("\t", "%09")
|
|
|
|
browser.set_status(f"Loading {host} {port} '{encoded_path}'…")
|
|
|
|
|
2021-06-05 21:48:42 +02:00
|
|
|
timeout = browser.config["connect_timeout"]
|
|
|
|
try:
|
|
|
|
response = self.request(host, port, path, timeout)
|
|
|
|
page = parse_response(response, item_type)
|
|
|
|
except GopherPluginException as exc:
|
|
|
|
browser.set_status_error("Error: " + exc.message)
|
|
|
|
return None
|
2021-06-06 03:02:32 +02:00
|
|
|
|
2021-06-05 21:48:42 +02:00
|
|
|
browser.load_page(page)
|
2021-06-06 03:02:32 +02:00
|
|
|
url = f"gopher://{host}:{port}/{item_type.value}{encoded_path}"
|
2021-06-05 21:48:42 +02:00
|
|
|
browser.current_url = url
|
|
|
|
return url
|
|
|
|
|
|
|
|
def request(self, host: str, port: int, path: str, timeout: int):
|
|
|
|
try:
|
|
|
|
sock = socket.create_connection((host, port), timeout=timeout)
|
|
|
|
except OSError as exc:
|
|
|
|
raise GopherPluginException("failed to establish connection")
|
|
|
|
|
|
|
|
try:
|
|
|
|
request_str = path.encode() + b"\r\n"
|
|
|
|
except ValueError as exc:
|
|
|
|
raise GopherPluginException("could not encode path")
|
|
|
|
|
|
|
|
sock.sendall(request_str)
|
|
|
|
response = b""
|
|
|
|
while True:
|
|
|
|
try:
|
|
|
|
buf = sock.recv(4096)
|
|
|
|
except socket.timeout:
|
|
|
|
buf = None
|
|
|
|
if not buf:
|
|
|
|
return response
|
|
|
|
response += buf
|
|
|
|
return decoded
|
|
|
|
|
|
|
|
|
|
|
|
def parse_response(response: bytes, item_type: ItemType, encoding: str ="utf8"):
|
|
|
|
decoded = response.decode(encoding=encoding, errors="replace")
|
|
|
|
metalines, links = parse_source(decoded, item_type)
|
|
|
|
return Page(decoded, metalines, links)
|
|
|
|
|
|
|
|
|
|
|
|
def parse_source(source: str, item_type: ItemType):
|
|
|
|
metalines = []
|
|
|
|
links = Links()
|
|
|
|
|
|
|
|
if item_type == ItemType.FILE:
|
|
|
|
for line in source.split("\n"):
|
|
|
|
line = line.rstrip("\r")
|
|
|
|
metalines.append(({"type": LineType.PARAGRAPH}, line))
|
|
|
|
|
|
|
|
# Gopher maps are kind of the default here, so it should be quite safe to
|
|
|
|
# parse any kind of text data.
|
|
|
|
elif item_type == ItemType.DIR:
|
|
|
|
current_link_id = 1
|
|
|
|
for line in source.split("\r\n"):
|
|
|
|
ltype, tline = line[:1], line[1:]
|
|
|
|
if ltype == "." and not tline:
|
|
|
|
break
|
|
|
|
|
|
|
|
parts = tline.split("\t")
|
|
|
|
if len(parts) != 4:
|
|
|
|
# TODO move me away
|
|
|
|
# Does not seem to be split by tabs, may be a file.
|
|
|
|
metalines.append(({"type": LineType.PARAGRAPH}, line))
|
|
|
|
continue
|
|
|
|
|
|
|
|
item_type = ItemType(ltype)
|
|
|
|
label, path, host, port = parts
|
|
|
|
if item_type == ItemType.INFO:
|
|
|
|
meta = {"type": LineType.PARAGRAPH}
|
|
|
|
metalines.append((meta, label))
|
|
|
|
continue
|
|
|
|
|
|
|
|
if item_type == ItemType.HTML and path[:4].upper() == "URL:":
|
|
|
|
link_url = path[4:]
|
|
|
|
else:
|
|
|
|
link_url = f"gopher://{host}:{port}/{ltype}{path}"
|
|
|
|
|
|
|
|
meta = {
|
|
|
|
"type": LineType.LINK,
|
|
|
|
"url": link_url,
|
|
|
|
"link": current_link_id
|
|
|
|
}
|
|
|
|
links[current_link_id] = link_url
|
|
|
|
|
|
|
|
icon = ICONS.get(item_type) or f"({ltype})"
|
|
|
|
text = f"[{current_link_id}] {icon} {label}"
|
|
|
|
metalines.append((meta, text))
|
|
|
|
current_link_id += 1
|
|
|
|
|
|
|
|
return metalines, links
|
|
|
|
|
|
|
|
|
|
|
|
plugin = GopherPlugin()
|