|
|
|
@ -6,11 +6,13 @@ from typing import Optional
|
|
|
|
|
|
|
|
|
|
from bebop.browser.browser import Browser
|
|
|
|
|
from bebop.command_line import CommandLine
|
|
|
|
|
from bebop.downloads import get_download_path
|
|
|
|
|
from bebop.links import Links
|
|
|
|
|
from bebop.metalines import LineType
|
|
|
|
|
from bebop.navigation import parse_url, parse_host_and_port
|
|
|
|
|
from bebop.mime import MimeType
|
|
|
|
|
from bebop.navigation import parse_url, parse_host_and_port, unparse_url
|
|
|
|
|
from bebop.page import Page
|
|
|
|
|
from bebop.plugins import SchemePlugin
|
|
|
|
|
from bebop.plugins import PluginCommand, SchemePlugin
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ItemType(Enum):
|
|
|
|
@ -36,15 +38,23 @@ class ItemType(Enum):
|
|
|
|
|
_missing_ = lambda s: ItemType.FILE
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Types that can be parsed as a page (see `parse_source`).
|
|
|
|
|
PARSABLE_TYPES = (ItemType.FILE, ItemType.DIR)
|
|
|
|
|
# Types that are not rendered by this plugin; should be handled by a separate
|
|
|
|
|
# program, but for now we simply do nothing with them.
|
|
|
|
|
UNHANDLED_TYPES = (
|
|
|
|
|
ItemType.CCSO, ItemType.ERROR, ItemType.TELNET, ItemType.REDUNDANT,
|
|
|
|
|
ItemType.TN3270
|
|
|
|
|
)
|
|
|
|
|
# Map item types lowercase names to the actual type, to easily set a type from
|
|
|
|
|
# the command-line.
|
|
|
|
|
USER_FRIENDLY_TYPES = {t.name.lower(): t for t in ItemType}
|
|
|
|
|
# Icons to display for some item types in a Gopher map.
|
|
|
|
|
ICONS = {
|
|
|
|
|
ItemType.FILE: "📄",
|
|
|
|
|
ItemType.DIR: "📂",
|
|
|
|
|
ItemType.ERROR: "❌",
|
|
|
|
|
ItemType.SEARCH: "🤔",
|
|
|
|
|
ItemType.SEARCH: "✍ ",
|
|
|
|
|
ItemType.HTML: "🌐",
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -64,8 +74,20 @@ class GopherPlugin(SchemePlugin):
|
|
|
|
|
|
|
|
|
|
def __init__(self) -> None:
|
|
|
|
|
super().__init__("gopher")
|
|
|
|
|
self.commands = [
|
|
|
|
|
PluginCommand(
|
|
|
|
|
"set-item-type",
|
|
|
|
|
"display current page as another item type (Gopher only)"
|
|
|
|
|
)
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
def open_url(self, browser: Browser, url: str) -> Optional[str]:
|
|
|
|
|
"""Request an selector from a Gopher host.
|
|
|
|
|
|
|
|
|
|
As Bebop works only with URLs and not really the Gopher host/selector
|
|
|
|
|
format, we use RFC 4266 (“The gopher URI Scheme”) for consistency with
|
|
|
|
|
other schemes and to get that sweet item type hint in the URL path.
|
|
|
|
|
"""
|
|
|
|
|
parts = parse_url(url)
|
|
|
|
|
host = parts["netloc"]
|
|
|
|
|
host_and_port = parse_host_and_port(host, 70)
|
|
|
|
@ -73,7 +95,8 @@ class GopherPlugin(SchemePlugin):
|
|
|
|
|
browser.set_status_error("Could not parse gopher URL.")
|
|
|
|
|
return None
|
|
|
|
|
host, port = host_and_port
|
|
|
|
|
path = parts["path"]
|
|
|
|
|
# Decode path; spaces in Gopher URLs are encoded for display in Bebop.
|
|
|
|
|
path = parts["path"].replace("%20", " ")
|
|
|
|
|
|
|
|
|
|
# If the URL has an item type, use it to properly parse the response.
|
|
|
|
|
type_path_match = TYPE_PATH_RE.match(path)
|
|
|
|
@ -103,54 +126,115 @@ class GopherPlugin(SchemePlugin):
|
|
|
|
|
else:
|
|
|
|
|
item_type = ItemType.DIR
|
|
|
|
|
|
|
|
|
|
# If we have text search in our path, encode it for UI & logging.
|
|
|
|
|
encoded_path = path.replace("\t", "%09")
|
|
|
|
|
# If we have spaces in our path, encode it for UI & logging.
|
|
|
|
|
encoded_path = path.replace(" ", "%20").replace("\t", "%09")
|
|
|
|
|
browser.set_status(f"Loading {host} {port} '{encoded_path}'…")
|
|
|
|
|
|
|
|
|
|
timeout = browser.config["connect_timeout"]
|
|
|
|
|
try:
|
|
|
|
|
response = self.request(host, port, path, timeout)
|
|
|
|
|
page = parse_response(response, item_type)
|
|
|
|
|
response = request(host, port, path, timeout)
|
|
|
|
|
except GopherPluginException as exc:
|
|
|
|
|
browser.set_status_error("Error: " + exc.message)
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
browser.load_page(page)
|
|
|
|
|
url = f"gopher://{host}:{port}/{item_type.value}{encoded_path}"
|
|
|
|
|
browser.current_url = url
|
|
|
|
|
if item_type in PARSABLE_TYPES:
|
|
|
|
|
page = parse_response(response, item_type)
|
|
|
|
|
browser.load_page(page)
|
|
|
|
|
browser.current_url = url
|
|
|
|
|
else:
|
|
|
|
|
download_dir = browser.config["download_path"]
|
|
|
|
|
filepath = get_download_path(url, download_dir=download_dir)
|
|
|
|
|
try:
|
|
|
|
|
with open(filepath, "wb") as download_file:
|
|
|
|
|
download_file.write(response)
|
|
|
|
|
except OSError as exc:
|
|
|
|
|
browser.set_status_error(f"Failed to save {url} ({exc})")
|
|
|
|
|
return None
|
|
|
|
|
else:
|
|
|
|
|
browser.set_status(f"Downloaded {url}.")
|
|
|
|
|
browser.last_download = None, filepath
|
|
|
|
|
|
|
|
|
|
return url
|
|
|
|
|
|
|
|
|
|
def request(self, host: str, port: int, path: str, timeout: int):
|
|
|
|
|
try:
|
|
|
|
|
sock = socket.create_connection((host, port), timeout=timeout)
|
|
|
|
|
except OSError as exc:
|
|
|
|
|
raise GopherPluginException("failed to establish connection")
|
|
|
|
|
def use_command(self, browser: Browser, name: str, text: str):
|
|
|
|
|
if name == "set-item-type":
|
|
|
|
|
given_type = text[len(name):].strip()
|
|
|
|
|
valid_types = [
|
|
|
|
|
t for t in USER_FRIENDLY_TYPES
|
|
|
|
|
if USER_FRIENDLY_TYPES[t] not in UNHANDLED_TYPES
|
|
|
|
|
]
|
|
|
|
|
if given_type not in valid_types:
|
|
|
|
|
error = "Valid types: " + ", ".join(valid_types)
|
|
|
|
|
browser.set_status_error(error)
|
|
|
|
|
return
|
|
|
|
|
item_type = USER_FRIENDLY_TYPES[given_type]
|
|
|
|
|
self.set_item_type(browser, item_type)
|
|
|
|
|
|
|
|
|
|
def set_item_type(self, browser: Browser, item_type: ItemType):
|
|
|
|
|
"""Re-parse the current page using this item type."""
|
|
|
|
|
if browser.current_scheme != self.scheme or not browser.current_page:
|
|
|
|
|
browser.set_status_error("Can only set item types on Gopher URLs.")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
logging.debug(f"Force parsing current page as {item_type}…")
|
|
|
|
|
current_source = browser.current_page.source
|
|
|
|
|
new_page = get_page_from_source(current_source, item_type)
|
|
|
|
|
browser.load_page(new_page)
|
|
|
|
|
|
|
|
|
|
# If possible, set the correct item type in the URL path as well.
|
|
|
|
|
url = browser.current_url
|
|
|
|
|
parts = parse_url(browser.current_url)
|
|
|
|
|
type_path_match = TYPE_PATH_RE.match(parts["path"])
|
|
|
|
|
if type_path_match:
|
|
|
|
|
path = type_path_match.group(2)
|
|
|
|
|
parts["path"] = f"/{item_type.value}{path}"
|
|
|
|
|
browser.current_url = unparse_url(parts)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
request_str = path.encode() + b"\r\n"
|
|
|
|
|
except ValueError as exc:
|
|
|
|
|
raise GopherPluginException("could not encode path")
|
|
|
|
|
|
|
|
|
|
sock.sendall(request_str)
|
|
|
|
|
response = b""
|
|
|
|
|
while True:
|
|
|
|
|
try:
|
|
|
|
|
buf = sock.recv(4096)
|
|
|
|
|
except socket.timeout:
|
|
|
|
|
buf = None
|
|
|
|
|
if not buf:
|
|
|
|
|
return response
|
|
|
|
|
response += buf
|
|
|
|
|
return decoded
|
|
|
|
|
def request(host: str, port: int, path: str, timeout: int) -> bytes:
|
|
|
|
|
"""Send a Gopher request and return the received bytes."""
|
|
|
|
|
try:
|
|
|
|
|
sock = socket.create_connection((host, port), timeout=timeout)
|
|
|
|
|
except OSError as exc:
|
|
|
|
|
raise GopherPluginException("failed to establish connection")
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
request_str = path.encode() + b"\r\n"
|
|
|
|
|
except ValueError as exc:
|
|
|
|
|
raise GopherPluginException("could not encode path")
|
|
|
|
|
|
|
|
|
|
sock.sendall(request_str)
|
|
|
|
|
response = b""
|
|
|
|
|
while True:
|
|
|
|
|
try:
|
|
|
|
|
buf = sock.recv(4096)
|
|
|
|
|
except socket.timeout:
|
|
|
|
|
buf = None
|
|
|
|
|
if not buf:
|
|
|
|
|
return response
|
|
|
|
|
response += buf
|
|
|
|
|
return decoded
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_response(response: bytes, item_type: ItemType, encoding: str ="utf8"):
|
|
|
|
|
"""Parse a Gopher response."""
|
|
|
|
|
decoded = response.decode(encoding=encoding, errors="replace")
|
|
|
|
|
metalines, links = parse_source(decoded, item_type)
|
|
|
|
|
return Page(decoded, metalines, links)
|
|
|
|
|
return get_page_from_source(decoded, item_type)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_page_from_source(source: str, item_type: ItemType):
|
|
|
|
|
"""Get a Page object from a decoded source text."""
|
|
|
|
|
metalines, links = parse_source(source, item_type)
|
|
|
|
|
return Page(source, metalines, links)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_source(source: str, item_type: ItemType):
|
|
|
|
|
"""Generate metalines and a Links instance for this source text.
|
|
|
|
|
|
|
|
|
|
The item_type must be a type that can be parsed: FILE or DIR. Any other
|
|
|
|
|
item type will silently result in no metalines.
|
|
|
|
|
"""
|
|
|
|
|
metalines = []
|
|
|
|
|
links = Links()
|
|
|
|
|
|
|
|
|
@ -163,7 +247,10 @@ def parse_source(source: str, item_type: ItemType):
|
|
|
|
|
# parse any kind of text data.
|
|
|
|
|
elif item_type == ItemType.DIR:
|
|
|
|
|
current_link_id = 1
|
|
|
|
|
for line in source.split("\r\n"):
|
|
|
|
|
# Split lines on \n and discard \r separately because some maps do not
|
|
|
|
|
# end lines with \r\n all the time.
|
|
|
|
|
for line in source.split("\n"):
|
|
|
|
|
line = line.rstrip("\r")
|
|
|
|
|
ltype, tline = line[:1], line[1:]
|
|
|
|
|
if ltype == "." and not tline:
|
|
|
|
|
break
|
|
|
|
|