Compare commits

...

5 Commits

@ -1,12 +1,12 @@
TODO
--------------------------------------------------------------------------------
gopher plugin
BACKLOG
--------------------------------------------------------------------------------
get/set config using command-line
gopher?
async event system
download without memory buffer
download in the background

@ -114,6 +114,10 @@ class Browser:
"""Return the scheme of the current URL."""
return parse_url(self._current_url)["scheme"] or ""
@property
def current_page(self) -> Optional[Page]:
return self.page_pad.current_page
def setup_special_pages(self):
"""Return a dict with the special pages functions."""
return {
@ -486,9 +490,9 @@ class Browser:
def handle_digit_input(self, init_char: int):
"""Focus command-line to select the link ID to follow."""
if self.page_pad.current_page is None:
if self.current_page is None:
return
links = self.page_pad.current_page.links
links = self.current_page.links
if links is None:
return
err, val = self.command_line.focus_for_link_navigation(init_char, links)
@ -521,14 +525,13 @@ class Browser:
If the click is on a link (appropriate line and columns), open it.
"""
page = self.page_pad.current_page
if not page:
if not self.current_page:
return
px, py = self.page_pad.current_column, self.page_pad.current_line
line_pos = y + py
if line_pos >= len(page.metalines):
if line_pos >= len(self.current_page.metalines):
return
meta, line = page.metalines[line_pos]
meta, line = self.current_page.metalines[line_pos]
if meta["type"] != LineType.LINK:
return
# "url" key is contained only in the first line of the link if its text
@ -536,7 +539,7 @@ class Browser:
# get the URL.
while "url" not in meta:
line_pos -= 1
meta, line = page.metalines[line_pos]
meta, line = self.current_page.metalines[line_pos]
url = meta["url"]
# The click is valid if it is on the link itself or the dimmed preview.
col_pos = x + px
@ -662,7 +665,7 @@ class Browser:
"""Add the current URL as bookmark."""
if not self.current_url:
return
current_title = self.page_pad.current_page.title or ""
current_title = self.current_page.title or ""
title = self.get_user_text_input(
"Bookmark title?",
CommandLine.CHAR_TEXT,
@ -690,9 +693,9 @@ class Browser:
get_source = special_pages_functions.get("source")
source_filename = get_source() if get_source else None
else:
if not self.page_pad.current_page:
if not self.current_page:
return
source = self.page_pad.current_page.source
source = self.current_page.source
with tempfile.NamedTemporaryFile("wt", delete=False) as source_file:
source_file.write(source)
source_filename = source_file.name
@ -754,7 +757,7 @@ class Browser:
def show_page_info(self):
"""Show some page informations in the status bar."""
page = self.page_pad.current_page
page = self.current_page
if not page:
return
mime = page.mime.short if page.mime else "(unknown MIME type)"
@ -786,10 +789,10 @@ class Browser:
def toggle_render_mode(self):
"""Switch to the next render mode for the current page."""
page = self.page_pad.current_page
if not page:
page = self.current_page
if not page or page.render is None:
return
if page.render is None or page.render not in RENDER_MODES:
if page.render not in RENDER_MODES:
next_mode = RENDER_MODES[0]
else:
cur_mod_index = RENDER_MODES.index(page.render)
@ -804,14 +807,13 @@ class Browser:
def search_in_page(self):
"""Search for words in the page."""
page = self.page_pad.current_page
if not page:
if not self.current_page:
return
search = self.get_user_text_input("Search", CommandLine.CHAR_TEXT)
if not search:
return
self.search_res_lines = []
for index, (_, line) in enumerate(page.metalines):
for index, (_, line) in enumerate(self.current_page.metalines):
if search in line:
self.search_res_lines.append(index)
if self.search_res_lines:

@ -176,3 +176,16 @@ def get_root_url(url: str) -> str:
parts["path"] = "/"
clear_post_path(parts)
return unparse_url(parts)
def parse_host_and_port(host: str, default_port: int):
"""Return the host and port from a "host:port" string."""
if ":" in host:
host, port = host.split(":", maxsplit=1)
try:
port = int(port)
except ValueError:
return None
else:
port = default_port
return host, port

@ -9,6 +9,7 @@ from enum import IntEnum
from typing import Optional
from bebop.mime import DEFAULT_MIME_TYPE, MimeType
from bebop.navigation import parse_host_and_port
from bebop.tofu import CertStatus, validate_cert
@ -127,16 +128,13 @@ class Request:
if not url_parts:
self.state = Request.STATE_INVALID_URL
return False
hostname = url_parts.groupdict()["host"]
if ":" in hostname:
hostname, port = hostname.split(":", maxsplit=1)
try:
port = int(port)
except ValueError:
self.state = Request.STATE_INVALID_URL
return False
else:
port = 1965
host = url_parts.groupdict()["host"]
host_and_port = parse_host_and_port(host, 1965)
if host_and_port is None:
self.state = Request.STATE_INVALID_URL
return False
hostname, port = host_and_port
self.hostname = hostname
# Prepare the Gemini request.

@ -0,0 +1,10 @@
Gopher plugin for Bebop
=======================
This is a Gopher plugin for [Bebop][bebop], refer to its docs for details.
[bebop]: https://git.dece.space/Dece/Bebop
Requires:
* Bebop >= 0.2.0

@ -0,0 +1 @@
from .plugin import plugin

@ -0,0 +1,205 @@
import logging
import re
import socket
from enum import Enum
from typing import Optional
from bebop.browser.browser import Browser
from bebop.command_line import CommandLine
from bebop.links import Links
from bebop.metalines import LineType
from bebop.navigation import parse_url, parse_host_and_port
from bebop.page import Page
from bebop.plugins import SchemePlugin
class ItemType(Enum):
FILE = "0"
DIR = "1"
CCSO = "2"
ERROR = "3"
BINHEXED = "4"
DOS = "5"
UUENC = "6"
SEARCH = "7"
TELNET = "8"
BINARY = "9"
REDUNDANT = "+"
TN3270 = "T"
GIF = "g"
IMAGE = "I"
# These are not in the original RFC but encountered frequently.
INFO = "i"
DOC = "d"
HTML = "h"
SOUND = "s"
_missing_ = lambda s: ItemType.FILE
UNHANDLED_TYPES = (
ItemType.CCSO, ItemType.ERROR, ItemType.TELNET, ItemType.REDUNDANT,
ItemType.TN3270
)
ICONS = {
ItemType.FILE: "📄",
ItemType.DIR: "📂",
ItemType.ERROR: "",
ItemType.SEARCH: "🤔",
ItemType.HTML: "🌐",
}
# This regex checks if the URL respects RFC 4266 and has an item type.
TYPE_PATH_RE = re.compile(r"^/([\d\+TgIidhs])(.*)")
class GopherPluginException(Exception):
def __init__(self, message: str) -> None:
super().__init__()
self.message = message
class GopherPlugin(SchemePlugin):
def __init__(self) -> None:
super().__init__("gopher")
def open_url(self, browser: Browser, url: str) -> Optional[str]:
parts = parse_url(url)
host = parts["netloc"]
host_and_port = parse_host_and_port(host, 70)
if host_and_port is None:
browser.set_status_error("Could not parse gopher URL.")
return None
host, port = host_and_port
path = parts["path"]
# If the URL has an item type, use it to properly parse the response.
type_path_match = TYPE_PATH_RE.match(path)
if type_path_match:
item_type = ItemType(type_path_match.group(1))
path = type_path_match.group(2)
# Don't try to open a Telnet connection or other silly retro things.
if item_type in UNHANDLED_TYPES:
browser.set_status_error(f"Unhandled item {item_type.name}.")
return None
# Let user input some text for search items.
if item_type == ItemType.SEARCH:
user_input = browser.get_user_text_input(
"Input:",
CommandLine.CHAR_TEXT,
strip=True
)
if not user_input:
return None
item_type = ItemType.DIR
previous_search_index = path.find("%09")
if previous_search_index > -1:
path = path[:previous_search_index]
path = f"{path}\t{user_input}"
# Note that we don't try to handle "h" items here because if the URL
# actually uses http scheme, it should not end up in this plugin.
else:
item_type = ItemType.DIR
# If we have text search in our path, encode it for UI & logging.
encoded_path = path.replace("\t", "%09")
browser.set_status(f"Loading {host} {port} '{encoded_path}'")
timeout = browser.config["connect_timeout"]
try:
response = self.request(host, port, path, timeout)
page = parse_response(response, item_type)
except GopherPluginException as exc:
browser.set_status_error("Error: " + exc.message)
return None
browser.load_page(page)
url = f"gopher://{host}:{port}/{item_type.value}{encoded_path}"
browser.current_url = url
return url
def request(self, host: str, port: int, path: str, timeout: int):
try:
sock = socket.create_connection((host, port), timeout=timeout)
except OSError as exc:
raise GopherPluginException("failed to establish connection")
try:
request_str = path.encode() + b"\r\n"
except ValueError as exc:
raise GopherPluginException("could not encode path")
sock.sendall(request_str)
response = b""
while True:
try:
buf = sock.recv(4096)
except socket.timeout:
buf = None
if not buf:
return response
response += buf
return decoded
def parse_response(response: bytes, item_type: ItemType, encoding: str ="utf8"):
decoded = response.decode(encoding=encoding, errors="replace")
metalines, links = parse_source(decoded, item_type)
return Page(decoded, metalines, links)
def parse_source(source: str, item_type: ItemType):
metalines = []
links = Links()
if item_type == ItemType.FILE:
for line in source.split("\n"):
line = line.rstrip("\r")
metalines.append(({"type": LineType.PARAGRAPH}, line))
# Gopher maps are kind of the default here, so it should be quite safe to
# parse any kind of text data.
elif item_type == ItemType.DIR:
current_link_id = 1
for line in source.split("\r\n"):
ltype, tline = line[:1], line[1:]
if ltype == "." and not tline:
break
parts = tline.split("\t")
if len(parts) != 4:
# TODO move me away
# Does not seem to be split by tabs, may be a file.
metalines.append(({"type": LineType.PARAGRAPH}, line))
continue
item_type = ItemType(ltype)
label, path, host, port = parts
if item_type == ItemType.INFO:
meta = {"type": LineType.PARAGRAPH}
metalines.append((meta, label))
continue
if item_type == ItemType.HTML and path[:4].upper() == "URL:":
link_url = path[4:]
else:
link_url = f"gopher://{host}:{port}/{ltype}{path}"
meta = {
"type": LineType.LINK,
"url": link_url,
"link": current_link_id
}
links[current_link_id] = link_url
icon = ICONS.get(item_type) or f"({ltype})"
text = f"[{current_link_id}] {icon} {label}"
metalines.append((meta, text))
current_link_id += 1
return metalines, links
plugin = GopherPlugin()

@ -0,0 +1,19 @@
[metadata]
name = bebop-browser-gopher
version = 0.1.0
description = Gopher plugin for the Bebop terminal browser
long_description = file: README.md
license = GPLv3
author = dece
author-email = shgck@pistache.land
home-page = https://git.dece.space/Dece/Bebop
classifiers =
Environment :: Console
Programming Language :: Python :: 3
Programming Language :: Python :: 3.7
[options]
packages = bebop_gopher
python_requires = >= 3.7
setup_requires = setuptools >= 38.3.0
Loading…
Cancel
Save