init: basic protocol/nav/rendering

This commit is contained in:
dece 2021-02-12 19:01:42 +01:00
commit 6d676d0471
16 changed files with 1202 additions and 0 deletions

3
.gitignore vendored Normal file
View file

@ -0,0 +1,3 @@
*.pyc
/venv/

0
bebop/__init__.py Normal file
View file

23
bebop/__main__.py Normal file
View file

@ -0,0 +1,23 @@
import argparse
from bebop.tofu import load_cert_stash
from bebop.screen import Screen
def main():
argparser = argparse.ArgumentParser()
argparser.add_argument("url", default=None)
args = argparser.parse_args()
if args.url:
start_url = args.url
if not start_url.startswith("gemini://"):
start_url = "gemini://" + start_url
else:
start_url = None
cert_stash = load_cert_stash("/tmp/stash")
Screen(cert_stash).run(start_url=start_url)
main()

29
bebop/colors.py Normal file
View file

@ -0,0 +1,29 @@
import curses
from enum import IntEnum
class ColorPair(IntEnum):
NORMAL = 0
ERROR = 1
LINK = 2
LINK_ID = 3
TITLE_1 = 4
TITLE_2 = 5
TITLE_3 = 6
PREFORMATTED = 7
BLOCKQUOTE = 8
DEBUG = 99
def init_colors():
curses.use_default_colors()
curses.init_pair(ColorPair.NORMAL, curses.COLOR_WHITE, -1)
curses.init_pair(ColorPair.ERROR, curses.COLOR_RED, -1)
curses.init_pair(ColorPair.LINK, curses.COLOR_CYAN, -1)
curses.init_pair(ColorPair.LINK_ID, curses.COLOR_WHITE, -1)
curses.init_pair(ColorPair.TITLE_1, curses.COLOR_GREEN, -1)
curses.init_pair(ColorPair.TITLE_2, curses.COLOR_MAGENTA, -1)
curses.init_pair(ColorPair.TITLE_3, curses.COLOR_MAGENTA, -1)
curses.init_pair(ColorPair.PREFORMATTED, curses.COLOR_YELLOW, -1)
curses.init_pair(ColorPair.BLOCKQUOTE, curses.COLOR_CYAN, -1)
curses.init_pair(ColorPair.DEBUG, curses.COLOR_BLACK, curses.COLOR_GREEN)

79
bebop/gemtext.py Normal file
View file

@ -0,0 +1,79 @@
import re
import typing
from dataclasses import dataclass
@dataclass
class Paragraph:
text: str
@dataclass
class Title:
level: int
text: str
RE = re.compile(r"(#{1,3})\s+(.+)")
@dataclass
class Link:
url: str
text: str
RE = re.compile(r"=>\s*(?P<url>\S+)(\s+(?P<text>.+))?")
@dataclass
class Preformatted:
lines: typing.List[str]
FENCE = "```"
@dataclass
class Blockquote:
text: str
RE = re.compile(r">\s*(.*)")
def parse_gemtext(data):
"""Parse UTF-8 encoded Gemtext as a list of elements."""
text = data.decode(encoding="utf8", errors="ignore")
elements = []
preformatted = None
for line in text.splitlines():
line = line.rstrip()
if not line:
continue
match = Title.RE.match(line)
if match:
hashtags, text = match.groups()
elements.append(Title(hashtags.count("#"), text))
continue
match = Link.RE.match(line)
if match:
match_dict = match.groupdict()
url, text = match_dict["url"], match_dict.get("text", "")
elements.append(Link(url, text))
continue
if line == Preformatted.FENCE:
if preformatted:
elements.append(preformatted)
preformatted = None
else:
preformatted = Preformatted([])
continue
match = Blockquote.RE.match(line)
if match:
text = match.groups()[0]
elements.append(Blockquote(text))
continue
if preformatted:
preformatted.lines.append(line)
else:
elements.append(Paragraph(line))
return elements

28
bebop/mouse.py Normal file
View file

@ -0,0 +1,28 @@
from enum import IntEnum
class ButtonState(IntEnum):
"""Most common flags from curses.getmouse()'s bstate.
Could not find a clear reference for that and released/pressed seem inverted
compared to snippets on the Web, so take portability with a grain of salt.
"""
LEFT_RELEASED = 1 << 0
LEFT_PRESSED = 1 << 1
LEFT_CLICKED = 1 << 2
LEFT_DCLICKED = 1 << 3
LEFT_TCLICKED = 1 << 4
MIDDLE_RELEASED = 1 << 5
MIDDLE_PRESSED = 1 << 6
MIDDLE_CLICKED = 1 << 7
MIDDLE_DCLICKED = 1 << 8
MIDDLE_TCLICKED = 1 << 9
RIGHT_RELEASED = 1 << 10
RIGHT_PRESSED = 1 << 11
RIGHT_CLICKED = 1 << 12
RIGHT_DCLICKED = 1 << 13
RIGHT_TCLICKED = 1 << 14
SCROLL_UP = 1 << 16
SCROLL_DOWN = 1 << 21
SHIFT = 1 << 25
CTRL = 1 << 26

34
bebop/navigation.py Normal file
View file

@ -0,0 +1,34 @@
import re
import urllib.parse
def parse_url(url, absolute=False):
"""Return URL parts from this URL.
This uses urllib.parse.urlparse to not reinvent the wheel, with a few
adjustments.
First, urllib does not know the Gemini scheme (yet!) so if it
is specified we strip it to get an absolute netloc.
Second, as this function can be used to process arbitrary user input, we
clean it a bit:
- strip whitespaces from the URL
- if "absolute" is True, consider that the URL is meant to be absolute, even
though it technically is not, e.g. "dece.space" is not absolute as it
misses either the // delimiter.
"""
if url.startswith("gemini://"):
url = url[7:]
parts = urllib.parse.urlparse(url, scheme="gemini")
if not parts.netloc and absolute:
parts = urllib.parse.urlparse(f"//{url}", scheme="gemini")
return parts
def join_url(base_url, url):
"""Join a base URL with a relative url."""
if base_url.startswith("gemini://"):
base_url = base_url[7:]
parts = parse_url(urllib.parse.urljoin(base_url, url))
return urllib.parse.urlunparse(parts)

175
bebop/protocol.py Normal file
View file

@ -0,0 +1,175 @@
import re
import socket
import ssl
from dataclasses import dataclass
from enum import IntEnum
from bebop.tofu import CertStatus, CERT_STATUS_INVALID, validate_cert
GEMINI_URL_RE = re.compile(r"gemini://(?P<host>[^/]+)(?P<path>.*)")
LINE_TERM = b"\r\n"
def parse_gemini_url(url):
"""Return a dict containing the hostname and the request path, or None."""
match = GEMINI_URL_RE.match(url)
return match.groupdict() if match else None
class Request:
"""A Gemini request."""
# Initial state, connection is not established yet.
STATE_INIT = 0
# An error has occured during cert verification, connection is aborted.
STATE_ERROR_CERT = 1
# An invalid URL has been provided, connection is aborted.
STATE_INVALID_URL = 2
# Invalid cert: user should abort or temporarily trust the cert.
STATE_INVALID_CERT = 3
# Unknown cert: user should abort, temporarily or always trust the cert.
STATE_UNKNOWN_CERT = 4
# Untrusted cert: connection is aborted, manually edit the stash.
STATE_UNTRUSTED_CERT = 5
# Valid and trusted cert: proceed.
STATE_OK = 6
def __init__(self, url, cert_stash):
self.url = url
self.cert_stash = cert_stash
self.state = Request.STATE_INIT
self.payload = b""
self.ssock = None
self.cert = None
self.cert_status = None
def connect(self):
"""Connect to a Gemini server and return a RequestEventType.
Return True if the connection is established. The caller has to verify
the request state and propose appropriate choices to the user if the
certificate status is not CertStatus.VALID (Request.STATE_OK).
If connect returns False, the secure socket is aborted before return. If
connect returns True, it is up to the caller to decide whether to
continue (call proceed) the connection or abort it (call abort).
"""
url_parts = parse_gemini_url(self.url)
if not url_parts:
self.state = Request.STATE_INVALID_URL
return False
hostname = url_parts["host"]
try:
self.payload = self.url.encode()
except ValueError:
self.state = Request.STATE_INVALID_URL
return False
self.payload += LINE_TERM
context = Request.get_ssl_context()
sock = socket.create_connection((hostname, 1965))
self.ssock = context.wrap_socket(sock)
der = self.ssock.getpeercert(binary_form=True)
self.cert_status, self.cert = \
validate_cert(der, hostname, self.cert_stash)
if self.cert_status == CertStatus.ERROR:
self.abort()
self.state = Request.STATE_ERROR_CERT
return False
if self.cert_status == CertStatus.WRONG_FINGERPRINT:
self.abort()
self.state = Request.STATE_UNTRUSTED_CERT
return False
if self.cert_status in CERT_STATUS_INVALID:
self.state = Request.STATE_INVALID_CERT
elif self.cert_status == CertStatus.VALID_NEW:
self.state = Request.STATE_UNKNOWN_CERT
else: # self.cert_status == CertStatus.VALID
self.state = Request.STATE_OK
return True
def abort(self):
"""Close the connection."""
self.ssock.close()
def proceed(self):
"""Complete the request: send the payload and return received data."""
self.ssock.sendall(self.payload)
response = b""
while True:
buf = self.ssock.recv(4096)
if not buf:
return response
response += buf
@staticmethod
def get_ssl_context():
"""Return a secure SSL context that is adequate for Gemini."""
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.options |= ssl.OP_NO_TLSv1
context.options |= ssl.OP_NO_TLSv1_1
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
return context
class StatusCode(IntEnum):
UNKNOWN = 0
INPUT = 10
SENSITIVE_INPUT = 11
SUCCESS = 20
REDIRECT = 30
PERMANENT_REDIRECT = 31
TEMP_FAILURE = 40
SERVER_UNAVAILABLE = 41
CGI_ERROR = 42
PROXY_ERROR = 43
SLOW_DOWN = 44
PERM_FAILURE = 50
NOT_FOUND = 51
GONE = 52
PROXY_REQUEST_REFUSED = 53
BAD_REQUEST = 59
CERT_REQUIRED = 60
CERT_NOT_AUTHORISED = 61
CERT_NOT_VALID = 62
_missing_ = lambda _: StatusCode.UNKNOWN
@dataclass
class Response:
"""A Gemini response."""
code: StatusCode
meta: str = ""
content: bytes = b""
HEADER_RE = re.compile(r"(\d{2}) (\S*)")
@staticmethod
def parse(data):
"""Parse a received response."""
try:
response_header_len = data.index(LINE_TERM)
response_header = data[:response_header_len].decode()
except ValueError:
return None
match = Response.HEADER_RE.match(response_header)
if not match:
return None
code, meta = match.groups()
response = Response(StatusCode(code), meta=meta)
if Response.get_generic_code(response.code) == StatusCode.SUCCESS:
content_offset = response_header_len + len(LINE_TERM)
response.content = data[content_offset:]
elif response.code == StatusCode.UNKNOWN:
return None
return response
@staticmethod
def get_generic_code(code):
"""Return the generic version (x0) of this code."""
return code - (code % 10)

229
bebop/rendering.py Normal file
View file

@ -0,0 +1,229 @@
import curses
import string
from enum import IntEnum
from bebop.colors import ColorPairs
from bebop.gemtext import Blockquote, Link, Paragraph, Preformatted, Title
SPLIT_CHARS = " \t-"
JOIN_CHAR = "-"
class LineType(IntEnum):
"""Type of line.
Keep lines type along with the content for later rendering.
Title type values match the title level to avoid looking it up.
"""
NONE = 0
TITLE_1 = 1
TITLE_2 = 2
TITLE_3 = 3
PARAGRAPH = 4
LINK = 5
PREFORMATTED = 6
BLOCKQUOTE = 7
def format_elements(elements, width):
"""Format elements into a list of lines with metadata.
The returned list ("metalines") are tuples (meta, line), meta being a
dict of metadata and line a text line to display. Currently the only
metadata keys used are:
- type: one of the Renderer.TYPE constants.
- url: only for links, the URL the link on this line refers to. Note
that this key is present only for the first line of the link, i.e.
long link descriptions wrapped on multiple lines will not have a this
key except for the first line.
- link_id: only alongside "url" key, ID generated for this link.
"""
metalines = []
context = {"last_link_id": 0, "width": width}
separator = ({"type": LineType.NONE}, "")
has_margins = False
for index, element in enumerate(elements):
previous_had_margins = has_margins
has_margins = False
if isinstance(element, Title):
element_metalines = format_title(element, context)
has_margins = True
elif isinstance(element, Paragraph):
element_metalines = format_paragraph(element, context)
has_margins = True
elif isinstance(element, Link):
element_metalines = format_link(element, context)
elif isinstance(element, Preformatted):
element_metalines = format_preformatted(element, context)
has_margins = True
elif isinstance(element, Blockquote):
element_metalines = format_blockquote(element, context)
has_margins = True
else:
continue
# If current element requires margins and is not the first elements,
# separate from previous element. Also do it if the current element does
# not require margins but follows an element that required it (e.g. link
# after a paragraph).
if (
(has_margins and index > 0)
or (not has_margins and previous_had_margins)
):
metalines.append(separator)
metalines += element_metalines
return metalines
def format_title(title: Title, context: dict):
"""Return metalines for this title."""
if title.level == 1:
wrapped = wrap_words(title.text, context["width"])
line_template = f"{{:^{context['width']}}}"
lines = (line_template.format(line) for line in wrapped)
else:
if title.level == 2:
text = title.text
else:
text = " " + title.text
lines = wrap_words(text, context["width"])
# Title levels match the type constants of titles.
return [({"type": LineType(title.level)}, line) for line in lines]
def format_paragraph(paragraph: Paragraph, context: dict):
"""Return metalines for this paragraph."""
lines = wrap_words(paragraph.text, context["width"])
return [({"type": LineType.PARAGRAPH}, line) for line in lines]
def format_link(link: Link, context: dict):
"""Return metalines for this link."""
link_id = context["last_link_id"] + 1
context["last_link_id"] = link_id
link_text = link.text or link.url
text = f"[{link_id}] " + link_text
lines = wrap_words(text, context["width"])
first_line_meta = {
"type": LineType.LINK,
"url": link.url,
"link_id": link_id
}
first_line = [(first_line_meta, lines[0])]
other_lines = [({"type": LineType.LINK}, line) for line in lines[1:]]
return first_line + other_lines
def format_preformatted(preformatted: Preformatted, context: dict):
"""Return metalines for this preformatted block."""
return [
({"type": LineType.PREFORMATTED}, line)
for line in preformatted.lines
]
def format_blockquote(blockquote: Blockquote, context: dict):
"""Return metalines for this blockquote."""
lines = wrap_words(blockquote.text, context["width"])
return [({"type": LineType.BLOCKQUOTE}, line) for line in lines]
def wrap_words(text, width):
"""Wrap a text in several lines according to the renderer's width."""
lines = []
line = ""
words = _explode_words(text)
for word in words:
line_len, word_len = len(line), len(word)
# If adding the new word would overflow the line, use a new line.
if line_len + word_len > width:
# Push only non-empty lines.
if line_len > 0:
lines.append(line)
line = ""
# Force split words that are longer than the width.
while word_len > width:
lines.append(word[:width - 1] + JOIN_CHAR)
word = word[width - 1:]
word_len = len(word)
word = word.lstrip()
line += word
if line:
lines.append(line)
return lines
def _explode_words(text):
words = []
pos = 0
while True:
sep, sep_index = _find_next_sep(text[pos:])
if not sep:
words.append(text[pos:])
return words
word = text[pos : pos + sep_index]
# If the separator is not a space char, append it to the word.
if sep in string.whitespace:
words.append(word)
words.append(sep)
else:
words.append(word + sep)
pos += sep_index + 1
def _find_next_sep(text):
indices = []
for sep in SPLIT_CHARS:
try:
indices.append((sep, text.index(sep)))
except ValueError:
pass
if not indices:
return ("", 0)
return min(indices, key=lambda e: e[1])
def render_lines(metalines, window, max_width):
"""Write a list of metalines in window.
As this function does not know about the window/pad previous size, it
expects a cleared window, especially if the new content is shorter than the
previous one: merely clearing after the resize will not remove artefacts.
Arguments:
- metalines: list of metalines to render, must have at least one element.
- window: window that will be resized as filled with rendered lines.
- max_width: line length limit for the pad.
Return:
The tuple (height, width) of the resized window.
"""
num_lines = len(metalines)
window.resize(num_lines, max_width)
for line_index, metaline in enumerate(metalines):
meta, line = metaline
line = line[:max_width - 1]
line_type = meta["type"]
if line_type == LineType.TITLE_1:
attributes = curses.color_pair(ColorPairs.TITLE_1) | curses.A_BOLD
window.addstr(line, attributes)
elif line_type == LineType.TITLE_2:
attributes = curses.color_pair(ColorPairs.TITLE_2) | curses.A_BOLD
window.addstr(line, attributes)
elif line_type == LineType.TITLE_3:
window.addstr(line, curses.color_pair(ColorPairs.TITLE_3))
elif line_type == LineType.LINK:
window.addstr(line, curses.color_pair(ColorPairs.LINK))
elif line_type == LineType.PREFORMATTED:
window.addstr(line, curses.color_pair(ColorPairs.PREFORMATTED))
elif line_type == LineType.BLOCKQUOTE:
attributes = (
curses.color_pair(ColorPairs.BLOCKQUOTE)
| curses.A_ITALIC
)
window.addstr(line, attributes)
else: # includes LineType.PARAGRAPH
window.addstr(line)
if line_index < num_lines - 1:
window.addstr("\n")
return num_lines, max_width

425
bebop/screen.py Normal file
View file

@ -0,0 +1,425 @@
import curses
import curses.ascii
import curses.textpad
import os
from bebop.colors import ColorPair, init_colors
from bebop.gemtext import parse_gemtext
from bebop.mouse import ButtonState
from bebop.navigation import join_url, parse_url
from bebop.protocol import Request, Response
from bebop.rendering import format_elements, render_lines
class Screen:
MAX_COLS = 1000
def __init__(self, cert_stash):
self.stash = cert_stash
self.screen = None
self.dim = (0, 0)
self.content_pad = None
self.content_pad_dim = (0, 0)
self.status_window = None
self.command_window = None
self.command_textbox = None
self.metalines = []
self.current_url = ""
self.current_line = 0
self.current_column = 0
self.links = {}
self.status_data = ("", 0)
@property
def h(self):
return self.dim[0]
@property
def w(self):
return self.dim[1]
def run(self, *args, **kwargs):
"""Use curses' wrapper around _run."""
os.environ.setdefault("ESCDELAY", "25")
curses.wrapper(self._run, *args, **kwargs)
def _run(self, stdscr, start_url=None):
"""Start displaying content and handling events."""
self.screen = stdscr
self.screen.clear()
self.screen.refresh()
init_colors()
curses.mousemask(curses.ALL_MOUSE_EVENTS)
self.dim = self.screen.getmaxyx()
self.content_pad_dim = (self.h - 2, Screen.MAX_COLS)
self.content_pad = curses.newpad(*self.content_pad_dim)
self.content_pad.scrollok(True)
self.content_pad.idlok(True)
self.status_window = self.screen.subwin(
*self.line_dim,
*self.status_window_pos,
)
self.command_window = self.screen.subwin(
*self.line_dim,
*self.command_window_pos,
)
curses.curs_set(0)
pending_url = start_url
running = True
while running:
if pending_url:
self.open_gemini_url(pending_url)
pending_url = None
char = self.screen.getch()
if char == ord("q"):
running = False
elif char == ord(":"):
command = self.input_common_command()
self.set_status(f"Command: {command}")
elif char == ord("s"):
self.set_status(f"h {self.h} w {self.w} cl {self.current_line} cc {self.current_column}")
elif char == ord("r"):
self.refresh_content()
elif char == ord("h"):
if self.current_column > 0:
self.current_column -= 1
self.refresh_content()
elif char == ord("j"):
self.scroll_content(1)
elif char == ord("k"):
self.scroll_content(1, scroll_up=True)
elif char == ord("l"):
if self.current_column < Screen.MAX_COLS - self.w:
self.current_column += 1
self.refresh_content()
elif curses.ascii.isdigit(char):
self.handle_digit_input(char)
elif char == curses.KEY_MOUSE:
self.handle_mouse(*curses.getmouse())
elif char == curses.KEY_RESIZE:
self.handle_resize()
@property
def content_window_refresh_size(self):
return self.h - 3, self.w - 1
@property
def status_window_pos(self):
return self.h - 2, 0
@property
def command_window_pos(self):
return self.h - 1, 0
@property
def line_dim(self):
return 1, self.w
def refresh_windows(self):
self.refresh_content()
self.refresh_status()
self.clear_command()
def refresh_content(self):
"""Refresh content pad's view using the current line/column."""
refresh_size = self.content_window_refresh_size
if refresh_size[0] <= 0 or refresh_size[1] <= 0:
return
self.content_pad.refresh(
self.current_line, self.current_column, 0, 0, *refresh_size
)
def refresh_status(self):
"""Refresh status line contents."""
text, pair = self.status_data
text = text[:self.w - 1]
self.status_window.addstr(0, 0, text, curses.color_pair(pair))
self.status_window.clrtoeol()
self.status_window.refresh()
def scroll_content(self, num_lines: int, scroll_up: bool =False):
"""Make the content pad scroll up and down by *num_lines*."""
if scroll_up:
min_line = 0
if self.current_line > min_line:
self.current_line = max(self.current_line - num_lines, min_line)
self.refresh_content()
else:
max_line = self.content_pad_dim[0] - self.h + 2
if self.current_line < max_line:
self.current_line = min(self.current_line + num_lines, max_line)
self.refresh_content()
def set_status(self, text):
"""Set a regular message in the status bar."""
self.status_data = text, ColorPair.NORMAL
self.refresh_status()
def set_status_error(self, text):
"""Set an error message in the status bar."""
self.status_data = f"Error: {text}", ColorPair.ERROR
self.refresh_status()
def open_url(self, url):
"""Try to open an URL.
If the URL is not strictly absolute, it will be opened relatively to the
current URL, unless there is no current URL yet.
"""
if self.current_url:
parts = parse_url(url)
else:
parts = parse_url(url, absolute=True)
if parts.scheme == "gemini":
if not parts.netloc:
url = join_url(self.current_url, url)
self.open_gemini_url(url)
else:
self.set_status_error(f"protocol {parts.scheme} not supported.")
def open_gemini_url(self, url):
"""Open a Gemini URL and set the formatted response as content."""
self.set_status(f"Loading {url}")
req = Request(url, self.stash)
connected = req.connect()
if not connected:
if req.state == Request.STATE_ERROR_CERT:
self.set_status_error("certificate was missing or corrupt.")
elif req.state == Request.STATE_UNTRUSTED_CERT:
self.set_status_error("certificate has been changed.")
# TODO propose the user ways to handle this.
else:
self.set_status_error("connection failed.")
return
if req.state == Request.STATE_INVALID_CERT:
# TODO propose abort / temp trust
pass
elif req.state == Request.STATE_UNKNOWN_CERT:
# TODO propose abort / temp trust / perm trust
pass
else:
pass # TODO
response = Response.parse(req.proceed())
if not response:
self.set_status_error("server response parsing failed.")
return
if response.code != 20:
self.set_status_error(f"unknown response code {response.code}.")
return
self.set_status(url)
self.current_url = url
self.show_gemtext(response.content)
def show_gemtext(self, gemtext: bytes):
"""Render Gemtext data in the content pad."""
elements = parse_gemtext(gemtext)
self.metalines = format_elements(elements, 80)
self.links = {
meta["link_id"]: meta["url"]
for meta, _ in self.metalines
if "link_id" in meta and "url" in meta
}
self.content_pad.clear()
h, w = render_lines(self.metalines, self.content_pad, Screen.MAX_COLS)
self.content_pad_dim = (h, w)
self.current_line = 0
self.current_column = 0
self.refresh_content()
def focus_command(self, command_char, validator=None, prefix=""):
"""Give user focus to the command bar.
Show the command char and give focus to the command textbox. The
validator function is passed to the textbox.
Arguments:
- command_char: char to display before the command line.
- validator: function to use to validate the input chars.
- prefix: string to insert before the cursor in the command line.
Returns:
User input as string. The string will be empty if the validator raised
an EscapeInterrupt.
"""
assert self.command_window is not None
self.command_window.clear()
self.command_window.refresh()
self.command_textbox = curses.textpad.Textbox(self.command_window)
self.command_window.addstr(command_char + prefix)
curses.curs_set(1)
try:
command = self.command_textbox.edit(validator)[1:]
except EscapeCommandInterrupt:
command = ""
except TerminateCommandInterrupt as exc:
command = exc.command
curses.curs_set(0)
self.clear_command()
return command
def gather_current_command(self):
"""Return the string currently written by the user in command line."""
return self.command_textbox.gather()[1:].rstrip()
def clear_command(self):
"""Clear the command line """
self.command_window.clear()
self.command_window.refresh()
self.screen.delch(self.h - 1, 0)
self.screen.refresh()
def input_common_command(self):
"""Focus command line to type a regular command. Currently useless."""
return self.focus_command(":", self.validate_common_char)
def validate_common_char(self, ch: int):
"""Generic input validator, handles a few more cases than default.
This validator can be used as a default validator as it handles, on top
of the Textbox defaults:
- Erasing the first command char, i.e. clearing the line, cancels the
command input.
- Pressing ESC also cancels the input.
This validator can be safely called at the beginning of other validators
to handle the keys above.
"""
if ch == curses.KEY_BACKSPACE: # Cancel input if all line is cleaned.
text = self.gather_current_command()
if len(text) == 0:
raise EscapeCommandInterrupt()
elif ch == curses.ascii.ESC: # Could be ESC or ALT
self.screen.nodelay(True)
ch = self.screen.getch()
if ch == -1:
raise EscapeCommandInterrupt()
self.screen.nodelay(False)
return ch
def handle_digit_input(self, init_char: int):
"""Handle a initial digit input by the user.
When a digit key is pressed, the user intents to visit a link (or
dropped something on the numpad). To reduce the number of key types
needed, Bebop uses the following algorithm:
- If the highest link ID on the page is less than 10, pressing the key
takes you to the link.
- If it's higher than 10, the user either inputs as many digits required
to disambiguate the link ID, or press enter to validate her input.
Examples
- I have 3 links. Pressing "2" takes me to link 2.
- I have 15 links. Pressing "3" and Enter takes me to link 2.
- I have 15 links. Pressing "1" and "2" takes me to link 12 (no
ambiguity, so Enter is not required).
- I have 456 links. Pressing "1", "2" and Enter takes me to link 12.
- I have 456 links. Pressing "1", "2" and "6" takes me to link 126 (no
ambiguity as well).
"""
digit = init_char & 0xf
num_links = len(self.links)
if num_links < 10:
self.open_link(digit)
return
required_digits = 0
while num_links:
required_digits += 1
num_links //= 10
link_input = self.focus_command(
"~",
validator=lambda ch: self._validate_link_digit(ch, required_digits),
prefix=chr(init_char),
)
try:
link_id = int(link_input)
except ValueError:
self.set_status_error("invalid link ID")
return
self.open_link(link_id)
def _validate_link_digit(self, ch: int, required_digits: int):
"""Handle input chars to be used as link ID."""
# Handle common chars.
ch = self.validate_common_char(ch)
# Only accept digits. If we reach the amount of required digits, open
# link now and leave command line. Else just process it.
if curses.ascii.isdigit(ch):
digits = self.gather_current_command()
if len(digits) + 1 == required_digits:
raise TerminateCommandInterrupt(digits + chr(ch))
return ch
# If not a digit but a printable character, ignore it.
if curses.ascii.isprint(ch):
return 0
# Everything else could be a control character and should be processed.
return ch
def disambiguate_link_id(self, digits: str, max_digits: int):
if len(digits) == max_digits:
return digits
def open_link(self, link_id: int):
"""Open the link with this link ID."""
if not link_id in self.links:
self.set_status_error(f"unknown link ID {link_id}.")
return
self.open_url(self.links[link_id])
def handle_mouse(self, mouse_id: int, x: int, y: int, z: int, bstate: int):
"""Handle mouse events.
Right now, only vertical scrolling is handled.
"""
if bstate & ButtonState.SCROLL_UP:
self.scroll_content(3, scroll_up=True)
elif bstate & ButtonState.SCROLL_DOWN:
self.scroll_content(3)
def handle_resize(self):
"""Try to not make everything collapse on resizes."""
# Refresh the whole screen before changing windows to avoid random
# blank screens.
self.screen.refresh()
old_dim = self.dim
self.dim = self.screen.getmaxyx()
# Avoid work if the resizing does not impact us.
if self.dim == old_dim:
return
# Resize windows to fit the new dimensions. Content pad will be updated
# on its own at the end of the function.
self.status_window.resize(*self.line_dim)
self.command_window.resize(*self.line_dim)
# Move the windows to their new position if that's still possible.
if self.status_window_pos[0] >= 0:
self.status_window.mvwin(*self.status_window_pos)
if self.command_window_pos[0] >= 0:
self.command_window.mvwin(*self.command_window_pos)
# If the content pad does not fit its whole place, we have to clean the
# gap between it and the status line. Refresh all screen.
if self.content_pad_dim[0] < self.h - 2:
self.screen.clear()
self.screen.refresh()
self.refresh_windows()
class EscapeCommandInterrupt(Exception):
"""Signal that ESC has been pressed during command line."""
pass
class TerminateCommandInterrupt(Exception):
"""Signal that validation ended command line input early. Use `command`."""
def __init__(self, command: str, *args, **kwargs):
super().__init__(*args, **kwargs)
self.command = command

0
bebop/tests/__init__.py Normal file
View file

View file

@ -0,0 +1,30 @@
import unittest
from ..navigation import join_url, parse_url
class TestNavigation(unittest.TestCase):
def test_parse_url(self):
res = parse_url("gemini://dece.space/parse-me.gmi")
self.assertEqual(res.scheme, "gemini")
self.assertEqual(res.netloc, "dece.space")
self.assertEqual(res.path, "/parse-me.gmi")
res_netloc = parse_url("//dece.space/parse-me.gmi")
self.assertEqual(res, res_netloc)
res = parse_url("dece.space/parse-me.gmi", absolute=True)
self.assertEqual(res.scheme, "gemini")
self.assertEqual(res.netloc, "dece.space")
self.assertEqual(res.path, "/parse-me.gmi")
def test_join_url(self):
url = join_url("gemini://dece.space", "some-file.gmi")
self.assertEqual(url, "gemini://dece.space/some-file.gmi")
url = join_url("gemini://dece.space", "some-file.gmi")
self.assertEqual(url, "gemini://dece.space/some-file.gmi")
url = join_url("gemini://dece.space/dir1/file.gmi", "other-file.gmi")
self.assertEqual(url, "gemini://dece.space/dir1/other-file.gmi")
url = join_url("gemini://dece.space/dir1/file.gmi", "../top-level.gmi")
self.assertEqual(url, "gemini://dece.space/top-level.gmi")

View file

@ -0,0 +1,10 @@
import unittest
from ..protocol import parse_gemini_url
class TestGemini(unittest.TestCase):
def test_parse_url(self):
r1 = parse_gemini_url("gemini://dece.space")
self.assertDictEqual(r1, {"host": "dece.space", "path": ""})

View file

@ -0,0 +1,41 @@
import unittest
from ..rendering import _explode_words, _find_next_sep, wrap_words
class TestRenderer(unittest.TestCase):
def test_wrap_words(self):
t = "wrap me wrap me youcantwrapthisonewithoutforce bla bla bla bla"
lines = wrap_words(t, 10)
expected = [
"wrap me ",
"wrap me ",
"youcantwr-",
"apthisone-",
"withoutfo-",
"rce bla ",
"bla bla ",
"bla",
]
self.assertListEqual(lines, expected)
def test_explode_words(self):
t = "unsplittableword word-dash tabatmyleft dot.sep"
words = _explode_words(t)
expected = [
"unsplittableword", " ", "word-", "dash", " ", "tabatmyleft",
" ", "dot.sep"
]
self.assertListEqual(words, expected)
def test_find_next_sep(self):
t = "unsplittableword word-dash"
sep, index = _find_next_sep(t)
self.assertEqual((sep, index), (" ", 16))
t = t[17:]
sep, index = _find_next_sep(t)
self.assertEqual((sep, index), ("-", 4))
t = t[5:]
sep, index = _find_next_sep(t)
self.assertEqual((sep, index), ("", 0))

95
bebop/tofu.py Normal file
View file

@ -0,0 +1,95 @@
import datetime
import hashlib
import re
from enum import Enum
import asn1crypto.x509
STASH_LINE_RE = re.compile(r"(\S+) (\S+) (\S+) (\d+)")
def load_cert_stash(stash_path):
"""Load the certificate stash from the file, or None on error.
The stash is a dict with host names as keys and tuples as values. Tuples
have four elements:
- the fingerprint algorithm (only SHA-512 is supported),
- the fingerprint as an hexstring,
- the timestamp of the expiration date,
- a boolean that is True when the stash is loaded from a file, i.e. always
true for entries loaded in this function, but should be false when it
concerns a certificate temporary trusted for the session only; this flag
is used to decide whether to save the certificate in the stash at exit.
"""
stash = {}
try:
with open(stash_path, "rt") as stash_file:
for line in stash_file:
match = STASH_LINE_RE.match(line)
if not match:
continue
name, algo, fingerprint, timestamp = match.groups()
stash[name] = (algo, fingerprint, timestamp, True)
except (OSError, ValueError):
return None
return stash
class CertStatus(Enum):
"""Value returned by validate_cert."""
# Cert is valid: proceed.
VALID = 0 # Known and valid.
VALID_NEW = 7 # New and valid.
# Cert is unusable or wrong: abort.
ERROR = 1 # General error.
WRONG_FINGERPRINT = 2 # Fingerprint in the stash is different.
# Cert has some issues: ask to proceed.
NOT_VALID_YET = 3 # not-before date invalid.
EXPIRED = 4 # not-after date invalid.
BAD_DOMAIN = 5 # Host name is not in cert's valid domains.
CERT_STATUS_INVALID = (
CertStatus.NOT_VALID_YET,
CertStatus.EXPIRED,
CertStatus.BAD_DOMAIN,
)
def validate_cert(der, hostname, cert_stash):
"""Return a tuple (CertStatus, Certificate) for this certificate."""
if der is None:
return CertStatus.ERROR, None
try:
cert = asn1crypto.x509.Certificate.load(der)
except ValueError:
return CertStatus.ERROR, None
# Check for sane parameters.
now = datetime.datetime.now(tz=datetime.timezone.utc)
if now < cert.not_valid_before:
return CertStatus.NOT_VALID_YET, cert
if now > cert.not_valid_after:
return CertStatus.EXPIRED, cert
if hostname not in cert.valid_domains:
return CertStatus.BAD_DOMAIN, cert
# Check the entire certificate fingerprint.
cert_hash = hashlib.sha512(der).hexdigest()
if hostname in cert_stash:
_, fingerprint, timestamp, _ = cert_stash[hostname]
if timestamp >= now.timestamp():
if cert_hash != fingerprint:
return CertStatus.WRONG_FINGERPRINT, cert
else:
# Disregard expired fingerprints.
pass
return CertStatus.VALID, cert
# The certificate is unknown and valid.
return CertStatus.VALID_NEW, cert
def trust(cert_stash, hostname, algo, fingerprint, timestamp,
trust_always=False):
cert_stash[hostname] = (algo, fingerprint, timestamp, trust_always)

1
requirements.txt Normal file
View file

@ -0,0 +1 @@
asn1crypto