From 6d676d0471e17e4b829d3ed56e71e50b717de5ee Mon Sep 17 00:00:00 2001 From: dece Date: Fri, 12 Feb 2021 19:01:42 +0100 Subject: [PATCH] init: basic protocol/nav/rendering --- .gitignore | 3 + bebop/__init__.py | 0 bebop/__main__.py | 23 ++ bebop/colors.py | 29 +++ bebop/gemtext.py | 79 ++++++ bebop/mouse.py | 28 +++ bebop/navigation.py | 34 +++ bebop/protocol.py | 175 ++++++++++++++ bebop/rendering.py | 229 ++++++++++++++++++ bebop/screen.py | 425 +++++++++++++++++++++++++++++++++ bebop/tests/__init__.py | 0 bebop/tests/test_navigation.py | 30 +++ bebop/tests/test_protocol.py | 10 + bebop/tests/test_rendering.py | 41 ++++ bebop/tofu.py | 95 ++++++++ requirements.txt | 1 + 16 files changed, 1202 insertions(+) create mode 100644 .gitignore create mode 100644 bebop/__init__.py create mode 100644 bebop/__main__.py create mode 100644 bebop/colors.py create mode 100644 bebop/gemtext.py create mode 100644 bebop/mouse.py create mode 100644 bebop/navigation.py create mode 100644 bebop/protocol.py create mode 100644 bebop/rendering.py create mode 100644 bebop/screen.py create mode 100644 bebop/tests/__init__.py create mode 100644 bebop/tests/test_navigation.py create mode 100644 bebop/tests/test_protocol.py create mode 100644 bebop/tests/test_rendering.py create mode 100644 bebop/tofu.py create mode 100644 requirements.txt diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..dd3015f --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +*.pyc + +/venv/ diff --git a/bebop/__init__.py b/bebop/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bebop/__main__.py b/bebop/__main__.py new file mode 100644 index 0000000..76eee12 --- /dev/null +++ b/bebop/__main__.py @@ -0,0 +1,23 @@ +import argparse + +from bebop.tofu import load_cert_stash +from bebop.screen import Screen + + +def main(): + argparser = argparse.ArgumentParser() + argparser.add_argument("url", default=None) + args = argparser.parse_args() + + if args.url: + start_url = args.url + if not start_url.startswith("gemini://"): + start_url = "gemini://" + start_url + else: + start_url = None + + cert_stash = load_cert_stash("/tmp/stash") + Screen(cert_stash).run(start_url=start_url) + + +main() diff --git a/bebop/colors.py b/bebop/colors.py new file mode 100644 index 0000000..69554bd --- /dev/null +++ b/bebop/colors.py @@ -0,0 +1,29 @@ +import curses +from enum import IntEnum + + +class ColorPair(IntEnum): + NORMAL = 0 + ERROR = 1 + LINK = 2 + LINK_ID = 3 + TITLE_1 = 4 + TITLE_2 = 5 + TITLE_3 = 6 + PREFORMATTED = 7 + BLOCKQUOTE = 8 + DEBUG = 99 + + +def init_colors(): + curses.use_default_colors() + curses.init_pair(ColorPair.NORMAL, curses.COLOR_WHITE, -1) + curses.init_pair(ColorPair.ERROR, curses.COLOR_RED, -1) + curses.init_pair(ColorPair.LINK, curses.COLOR_CYAN, -1) + curses.init_pair(ColorPair.LINK_ID, curses.COLOR_WHITE, -1) + curses.init_pair(ColorPair.TITLE_1, curses.COLOR_GREEN, -1) + curses.init_pair(ColorPair.TITLE_2, curses.COLOR_MAGENTA, -1) + curses.init_pair(ColorPair.TITLE_3, curses.COLOR_MAGENTA, -1) + curses.init_pair(ColorPair.PREFORMATTED, curses.COLOR_YELLOW, -1) + curses.init_pair(ColorPair.BLOCKQUOTE, curses.COLOR_CYAN, -1) + curses.init_pair(ColorPair.DEBUG, curses.COLOR_BLACK, curses.COLOR_GREEN) diff --git a/bebop/gemtext.py b/bebop/gemtext.py new file mode 100644 index 0000000..44e0fac --- /dev/null +++ b/bebop/gemtext.py @@ -0,0 +1,79 @@ +import re +import typing +from dataclasses import dataclass + + +@dataclass +class Paragraph: + text: str + + +@dataclass +class Title: + level: int + text: str + RE = re.compile(r"(#{1,3})\s+(.+)") + + +@dataclass +class Link: + url: str + text: str + RE = re.compile(r"=>\s*(?P\S+)(\s+(?P.+))?") + + +@dataclass +class Preformatted: + lines: typing.List[str] + FENCE = "```" + + +@dataclass +class Blockquote: + text: str + RE = re.compile(r">\s*(.*)") + + +def parse_gemtext(data): + """Parse UTF-8 encoded Gemtext as a list of elements.""" + text = data.decode(encoding="utf8", errors="ignore") + elements = [] + preformatted = None + for line in text.splitlines(): + line = line.rstrip() + if not line: + continue + + match = Title.RE.match(line) + if match: + hashtags, text = match.groups() + elements.append(Title(hashtags.count("#"), text)) + continue + + match = Link.RE.match(line) + if match: + match_dict = match.groupdict() + url, text = match_dict["url"], match_dict.get("text", "") + elements.append(Link(url, text)) + continue + + if line == Preformatted.FENCE: + if preformatted: + elements.append(preformatted) + preformatted = None + else: + preformatted = Preformatted([]) + continue + + match = Blockquote.RE.match(line) + if match: + text = match.groups()[0] + elements.append(Blockquote(text)) + continue + + if preformatted: + preformatted.lines.append(line) + else: + elements.append(Paragraph(line)) + + return elements diff --git a/bebop/mouse.py b/bebop/mouse.py new file mode 100644 index 0000000..e95f66e --- /dev/null +++ b/bebop/mouse.py @@ -0,0 +1,28 @@ +from enum import IntEnum + + +class ButtonState(IntEnum): + """Most common flags from curses.getmouse()'s bstate. + + Could not find a clear reference for that and released/pressed seem inverted + compared to snippets on the Web, so take portability with a grain of salt. + """ + LEFT_RELEASED = 1 << 0 + LEFT_PRESSED = 1 << 1 + LEFT_CLICKED = 1 << 2 + LEFT_DCLICKED = 1 << 3 + LEFT_TCLICKED = 1 << 4 + MIDDLE_RELEASED = 1 << 5 + MIDDLE_PRESSED = 1 << 6 + MIDDLE_CLICKED = 1 << 7 + MIDDLE_DCLICKED = 1 << 8 + MIDDLE_TCLICKED = 1 << 9 + RIGHT_RELEASED = 1 << 10 + RIGHT_PRESSED = 1 << 11 + RIGHT_CLICKED = 1 << 12 + RIGHT_DCLICKED = 1 << 13 + RIGHT_TCLICKED = 1 << 14 + SCROLL_UP = 1 << 16 + SCROLL_DOWN = 1 << 21 + SHIFT = 1 << 25 + CTRL = 1 << 26 diff --git a/bebop/navigation.py b/bebop/navigation.py new file mode 100644 index 0000000..9d75d0d --- /dev/null +++ b/bebop/navigation.py @@ -0,0 +1,34 @@ +import re +import urllib.parse + + +def parse_url(url, absolute=False): + """Return URL parts from this URL. + + This uses urllib.parse.urlparse to not reinvent the wheel, with a few + adjustments. + + First, urllib does not know the Gemini scheme (yet!) so if it + is specified we strip it to get an absolute netloc. + + Second, as this function can be used to process arbitrary user input, we + clean it a bit: + - strip whitespaces from the URL + - if "absolute" is True, consider that the URL is meant to be absolute, even + though it technically is not, e.g. "dece.space" is not absolute as it + misses either the // delimiter. + """ + if url.startswith("gemini://"): + url = url[7:] + parts = urllib.parse.urlparse(url, scheme="gemini") + if not parts.netloc and absolute: + parts = urllib.parse.urlparse(f"//{url}", scheme="gemini") + return parts + + +def join_url(base_url, url): + """Join a base URL with a relative url.""" + if base_url.startswith("gemini://"): + base_url = base_url[7:] + parts = parse_url(urllib.parse.urljoin(base_url, url)) + return urllib.parse.urlunparse(parts) diff --git a/bebop/protocol.py b/bebop/protocol.py new file mode 100644 index 0000000..fa734fb --- /dev/null +++ b/bebop/protocol.py @@ -0,0 +1,175 @@ +import re +import socket +import ssl +from dataclasses import dataclass +from enum import IntEnum + +from bebop.tofu import CertStatus, CERT_STATUS_INVALID, validate_cert + + +GEMINI_URL_RE = re.compile(r"gemini://(?P[^/]+)(?P.*)") +LINE_TERM = b"\r\n" + + +def parse_gemini_url(url): + """Return a dict containing the hostname and the request path, or None.""" + match = GEMINI_URL_RE.match(url) + return match.groupdict() if match else None + + +class Request: + """A Gemini request.""" + + # Initial state, connection is not established yet. + STATE_INIT = 0 + # An error has occured during cert verification, connection is aborted. + STATE_ERROR_CERT = 1 + # An invalid URL has been provided, connection is aborted. + STATE_INVALID_URL = 2 + # Invalid cert: user should abort or temporarily trust the cert. + STATE_INVALID_CERT = 3 + # Unknown cert: user should abort, temporarily or always trust the cert. + STATE_UNKNOWN_CERT = 4 + # Untrusted cert: connection is aborted, manually edit the stash. + STATE_UNTRUSTED_CERT = 5 + # Valid and trusted cert: proceed. + STATE_OK = 6 + + def __init__(self, url, cert_stash): + self.url = url + self.cert_stash = cert_stash + self.state = Request.STATE_INIT + self.payload = b"" + self.ssock = None + self.cert = None + self.cert_status = None + + def connect(self): + """Connect to a Gemini server and return a RequestEventType. + + Return True if the connection is established. The caller has to verify + the request state and propose appropriate choices to the user if the + certificate status is not CertStatus.VALID (Request.STATE_OK). + + If connect returns False, the secure socket is aborted before return. If + connect returns True, it is up to the caller to decide whether to + continue (call proceed) the connection or abort it (call abort). + """ + url_parts = parse_gemini_url(self.url) + if not url_parts: + self.state = Request.STATE_INVALID_URL + return False + hostname = url_parts["host"] + + try: + self.payload = self.url.encode() + except ValueError: + self.state = Request.STATE_INVALID_URL + return False + self.payload += LINE_TERM + + context = Request.get_ssl_context() + sock = socket.create_connection((hostname, 1965)) + self.ssock = context.wrap_socket(sock) + der = self.ssock.getpeercert(binary_form=True) + self.cert_status, self.cert = \ + validate_cert(der, hostname, self.cert_stash) + if self.cert_status == CertStatus.ERROR: + self.abort() + self.state = Request.STATE_ERROR_CERT + return False + if self.cert_status == CertStatus.WRONG_FINGERPRINT: + self.abort() + self.state = Request.STATE_UNTRUSTED_CERT + return False + + if self.cert_status in CERT_STATUS_INVALID: + self.state = Request.STATE_INVALID_CERT + elif self.cert_status == CertStatus.VALID_NEW: + self.state = Request.STATE_UNKNOWN_CERT + else: # self.cert_status == CertStatus.VALID + self.state = Request.STATE_OK + return True + + def abort(self): + """Close the connection.""" + self.ssock.close() + + def proceed(self): + """Complete the request: send the payload and return received data.""" + self.ssock.sendall(self.payload) + response = b"" + while True: + buf = self.ssock.recv(4096) + if not buf: + return response + response += buf + + @staticmethod + def get_ssl_context(): + """Return a secure SSL context that is adequate for Gemini.""" + context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + context.options |= ssl.OP_NO_TLSv1 + context.options |= ssl.OP_NO_TLSv1_1 + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + return context + + +class StatusCode(IntEnum): + UNKNOWN = 0 + INPUT = 10 + SENSITIVE_INPUT = 11 + SUCCESS = 20 + REDIRECT = 30 + PERMANENT_REDIRECT = 31 + TEMP_FAILURE = 40 + SERVER_UNAVAILABLE = 41 + CGI_ERROR = 42 + PROXY_ERROR = 43 + SLOW_DOWN = 44 + PERM_FAILURE = 50 + NOT_FOUND = 51 + GONE = 52 + PROXY_REQUEST_REFUSED = 53 + BAD_REQUEST = 59 + CERT_REQUIRED = 60 + CERT_NOT_AUTHORISED = 61 + CERT_NOT_VALID = 62 + _missing_ = lambda _: StatusCode.UNKNOWN + + +@dataclass +class Response: + """A Gemini response.""" + + code: StatusCode + meta: str = "" + content: bytes = b"" + + HEADER_RE = re.compile(r"(\d{2}) (\S*)") + + @staticmethod + def parse(data): + """Parse a received response.""" + try: + response_header_len = data.index(LINE_TERM) + response_header = data[:response_header_len].decode() + except ValueError: + return None + match = Response.HEADER_RE.match(response_header) + if not match: + return None + code, meta = match.groups() + response = Response(StatusCode(code), meta=meta) + if Response.get_generic_code(response.code) == StatusCode.SUCCESS: + content_offset = response_header_len + len(LINE_TERM) + response.content = data[content_offset:] + elif response.code == StatusCode.UNKNOWN: + return None + return response + + @staticmethod + def get_generic_code(code): + """Return the generic version (x0) of this code.""" + return code - (code % 10) diff --git a/bebop/rendering.py b/bebop/rendering.py new file mode 100644 index 0000000..375181c --- /dev/null +++ b/bebop/rendering.py @@ -0,0 +1,229 @@ +import curses +import string +from enum import IntEnum + +from bebop.colors import ColorPairs +from bebop.gemtext import Blockquote, Link, Paragraph, Preformatted, Title + + +SPLIT_CHARS = " \t-" +JOIN_CHAR = "-" + + +class LineType(IntEnum): + """Type of line. + + Keep lines type along with the content for later rendering. + Title type values match the title level to avoid looking it up. + """ + NONE = 0 + TITLE_1 = 1 + TITLE_2 = 2 + TITLE_3 = 3 + PARAGRAPH = 4 + LINK = 5 + PREFORMATTED = 6 + BLOCKQUOTE = 7 + + +def format_elements(elements, width): + """Format elements into a list of lines with metadata. + + The returned list ("metalines") are tuples (meta, line), meta being a + dict of metadata and line a text line to display. Currently the only + metadata keys used are: + - type: one of the Renderer.TYPE constants. + - url: only for links, the URL the link on this line refers to. Note + that this key is present only for the first line of the link, i.e. + long link descriptions wrapped on multiple lines will not have a this + key except for the first line. + - link_id: only alongside "url" key, ID generated for this link. + """ + metalines = [] + context = {"last_link_id": 0, "width": width} + separator = ({"type": LineType.NONE}, "") + has_margins = False + for index, element in enumerate(elements): + previous_had_margins = has_margins + has_margins = False + if isinstance(element, Title): + element_metalines = format_title(element, context) + has_margins = True + elif isinstance(element, Paragraph): + element_metalines = format_paragraph(element, context) + has_margins = True + elif isinstance(element, Link): + element_metalines = format_link(element, context) + elif isinstance(element, Preformatted): + element_metalines = format_preformatted(element, context) + has_margins = True + elif isinstance(element, Blockquote): + element_metalines = format_blockquote(element, context) + has_margins = True + else: + continue + # If current element requires margins and is not the first elements, + # separate from previous element. Also do it if the current element does + # not require margins but follows an element that required it (e.g. link + # after a paragraph). + if ( + (has_margins and index > 0) + or (not has_margins and previous_had_margins) + ): + metalines.append(separator) + metalines += element_metalines + return metalines + + +def format_title(title: Title, context: dict): + """Return metalines for this title.""" + if title.level == 1: + wrapped = wrap_words(title.text, context["width"]) + line_template = f"{{:^{context['width']}}}" + lines = (line_template.format(line) for line in wrapped) + else: + if title.level == 2: + text = title.text + else: + text = " " + title.text + lines = wrap_words(text, context["width"]) + # Title levels match the type constants of titles. + return [({"type": LineType(title.level)}, line) for line in lines] + + +def format_paragraph(paragraph: Paragraph, context: dict): + """Return metalines for this paragraph.""" + lines = wrap_words(paragraph.text, context["width"]) + return [({"type": LineType.PARAGRAPH}, line) for line in lines] + + +def format_link(link: Link, context: dict): + """Return metalines for this link.""" + link_id = context["last_link_id"] + 1 + context["last_link_id"] = link_id + link_text = link.text or link.url + text = f"[{link_id}] " + link_text + lines = wrap_words(text, context["width"]) + first_line_meta = { + "type": LineType.LINK, + "url": link.url, + "link_id": link_id + } + first_line = [(first_line_meta, lines[0])] + other_lines = [({"type": LineType.LINK}, line) for line in lines[1:]] + return first_line + other_lines + + +def format_preformatted(preformatted: Preformatted, context: dict): + """Return metalines for this preformatted block.""" + return [ + ({"type": LineType.PREFORMATTED}, line) + for line in preformatted.lines + ] + + +def format_blockquote(blockquote: Blockquote, context: dict): + """Return metalines for this blockquote.""" + lines = wrap_words(blockquote.text, context["width"]) + return [({"type": LineType.BLOCKQUOTE}, line) for line in lines] + + +def wrap_words(text, width): + """Wrap a text in several lines according to the renderer's width.""" + lines = [] + line = "" + words = _explode_words(text) + for word in words: + line_len, word_len = len(line), len(word) + # If adding the new word would overflow the line, use a new line. + if line_len + word_len > width: + # Push only non-empty lines. + if line_len > 0: + lines.append(line) + line = "" + # Force split words that are longer than the width. + while word_len > width: + lines.append(word[:width - 1] + JOIN_CHAR) + word = word[width - 1:] + word_len = len(word) + word = word.lstrip() + line += word + if line: + lines.append(line) + return lines + + +def _explode_words(text): + words = [] + pos = 0 + while True: + sep, sep_index = _find_next_sep(text[pos:]) + if not sep: + words.append(text[pos:]) + return words + word = text[pos : pos + sep_index] + # If the separator is not a space char, append it to the word. + if sep in string.whitespace: + words.append(word) + words.append(sep) + else: + words.append(word + sep) + pos += sep_index + 1 + + +def _find_next_sep(text): + indices = [] + for sep in SPLIT_CHARS: + try: + indices.append((sep, text.index(sep))) + except ValueError: + pass + if not indices: + return ("", 0) + return min(indices, key=lambda e: e[1]) + + +def render_lines(metalines, window, max_width): + """Write a list of metalines in window. + + As this function does not know about the window/pad previous size, it + expects a cleared window, especially if the new content is shorter than the + previous one: merely clearing after the resize will not remove artefacts. + + Arguments: + - metalines: list of metalines to render, must have at least one element. + - window: window that will be resized as filled with rendered lines. + - max_width: line length limit for the pad. + + Return: + The tuple (height, width) of the resized window. + """ + num_lines = len(metalines) + window.resize(num_lines, max_width) + for line_index, metaline in enumerate(metalines): + meta, line = metaline + line = line[:max_width - 1] + line_type = meta["type"] + if line_type == LineType.TITLE_1: + attributes = curses.color_pair(ColorPairs.TITLE_1) | curses.A_BOLD + window.addstr(line, attributes) + elif line_type == LineType.TITLE_2: + attributes = curses.color_pair(ColorPairs.TITLE_2) | curses.A_BOLD + window.addstr(line, attributes) + elif line_type == LineType.TITLE_3: + window.addstr(line, curses.color_pair(ColorPairs.TITLE_3)) + elif line_type == LineType.LINK: + window.addstr(line, curses.color_pair(ColorPairs.LINK)) + elif line_type == LineType.PREFORMATTED: + window.addstr(line, curses.color_pair(ColorPairs.PREFORMATTED)) + elif line_type == LineType.BLOCKQUOTE: + attributes = ( + curses.color_pair(ColorPairs.BLOCKQUOTE) + | curses.A_ITALIC + ) + window.addstr(line, attributes) + else: # includes LineType.PARAGRAPH + window.addstr(line) + if line_index < num_lines - 1: + window.addstr("\n") + return num_lines, max_width diff --git a/bebop/screen.py b/bebop/screen.py new file mode 100644 index 0000000..edeefa6 --- /dev/null +++ b/bebop/screen.py @@ -0,0 +1,425 @@ +import curses +import curses.ascii +import curses.textpad +import os + +from bebop.colors import ColorPair, init_colors +from bebop.gemtext import parse_gemtext +from bebop.mouse import ButtonState +from bebop.navigation import join_url, parse_url +from bebop.protocol import Request, Response +from bebop.rendering import format_elements, render_lines + + +class Screen: + + MAX_COLS = 1000 + + def __init__(self, cert_stash): + self.stash = cert_stash + self.screen = None + self.dim = (0, 0) + self.content_pad = None + self.content_pad_dim = (0, 0) + self.status_window = None + self.command_window = None + self.command_textbox = None + self.metalines = [] + self.current_url = "" + self.current_line = 0 + self.current_column = 0 + self.links = {} + self.status_data = ("", 0) + + @property + def h(self): + return self.dim[0] + + @property + def w(self): + return self.dim[1] + + def run(self, *args, **kwargs): + """Use curses' wrapper around _run.""" + os.environ.setdefault("ESCDELAY", "25") + curses.wrapper(self._run, *args, **kwargs) + + def _run(self, stdscr, start_url=None): + """Start displaying content and handling events.""" + self.screen = stdscr + self.screen.clear() + self.screen.refresh() + init_colors() + curses.mousemask(curses.ALL_MOUSE_EVENTS) + + self.dim = self.screen.getmaxyx() + self.content_pad_dim = (self.h - 2, Screen.MAX_COLS) + self.content_pad = curses.newpad(*self.content_pad_dim) + self.content_pad.scrollok(True) + self.content_pad.idlok(True) + self.status_window = self.screen.subwin( + *self.line_dim, + *self.status_window_pos, + ) + self.command_window = self.screen.subwin( + *self.line_dim, + *self.command_window_pos, + ) + curses.curs_set(0) + + pending_url = start_url + running = True + while running: + if pending_url: + self.open_gemini_url(pending_url) + pending_url = None + + char = self.screen.getch() + if char == ord("q"): + running = False + elif char == ord(":"): + command = self.input_common_command() + self.set_status(f"Command: {command}") + elif char == ord("s"): + self.set_status(f"h {self.h} w {self.w} cl {self.current_line} cc {self.current_column}") + elif char == ord("r"): + self.refresh_content() + elif char == ord("h"): + if self.current_column > 0: + self.current_column -= 1 + self.refresh_content() + elif char == ord("j"): + self.scroll_content(1) + elif char == ord("k"): + self.scroll_content(1, scroll_up=True) + elif char == ord("l"): + if self.current_column < Screen.MAX_COLS - self.w: + self.current_column += 1 + self.refresh_content() + elif curses.ascii.isdigit(char): + self.handle_digit_input(char) + elif char == curses.KEY_MOUSE: + self.handle_mouse(*curses.getmouse()) + elif char == curses.KEY_RESIZE: + self.handle_resize() + + @property + def content_window_refresh_size(self): + return self.h - 3, self.w - 1 + + @property + def status_window_pos(self): + return self.h - 2, 0 + + @property + def command_window_pos(self): + return self.h - 1, 0 + + @property + def line_dim(self): + return 1, self.w + + def refresh_windows(self): + self.refresh_content() + self.refresh_status() + self.clear_command() + + def refresh_content(self): + """Refresh content pad's view using the current line/column.""" + refresh_size = self.content_window_refresh_size + if refresh_size[0] <= 0 or refresh_size[1] <= 0: + return + self.content_pad.refresh( + self.current_line, self.current_column, 0, 0, *refresh_size + ) + + def refresh_status(self): + """Refresh status line contents.""" + text, pair = self.status_data + text = text[:self.w - 1] + self.status_window.addstr(0, 0, text, curses.color_pair(pair)) + self.status_window.clrtoeol() + self.status_window.refresh() + + def scroll_content(self, num_lines: int, scroll_up: bool =False): + """Make the content pad scroll up and down by *num_lines*.""" + if scroll_up: + min_line = 0 + if self.current_line > min_line: + self.current_line = max(self.current_line - num_lines, min_line) + self.refresh_content() + else: + max_line = self.content_pad_dim[0] - self.h + 2 + if self.current_line < max_line: + self.current_line = min(self.current_line + num_lines, max_line) + self.refresh_content() + + def set_status(self, text): + """Set a regular message in the status bar.""" + self.status_data = text, ColorPair.NORMAL + self.refresh_status() + + def set_status_error(self, text): + """Set an error message in the status bar.""" + self.status_data = f"Error: {text}", ColorPair.ERROR + self.refresh_status() + + def open_url(self, url): + """Try to open an URL. + + If the URL is not strictly absolute, it will be opened relatively to the + current URL, unless there is no current URL yet. + """ + if self.current_url: + parts = parse_url(url) + else: + parts = parse_url(url, absolute=True) + if parts.scheme == "gemini": + if not parts.netloc: + url = join_url(self.current_url, url) + self.open_gemini_url(url) + else: + self.set_status_error(f"protocol {parts.scheme} not supported.") + + def open_gemini_url(self, url): + """Open a Gemini URL and set the formatted response as content.""" + self.set_status(f"Loading {url}") + req = Request(url, self.stash) + connected = req.connect() + if not connected: + if req.state == Request.STATE_ERROR_CERT: + self.set_status_error("certificate was missing or corrupt.") + elif req.state == Request.STATE_UNTRUSTED_CERT: + self.set_status_error("certificate has been changed.") + # TODO propose the user ways to handle this. + else: + self.set_status_error("connection failed.") + return + + if req.state == Request.STATE_INVALID_CERT: + # TODO propose abort / temp trust + pass + elif req.state == Request.STATE_UNKNOWN_CERT: + # TODO propose abort / temp trust / perm trust + pass + else: + pass # TODO + + response = Response.parse(req.proceed()) + if not response: + self.set_status_error("server response parsing failed.") + return + + if response.code != 20: + self.set_status_error(f"unknown response code {response.code}.") + return + + self.set_status(url) + self.current_url = url + self.show_gemtext(response.content) + + def show_gemtext(self, gemtext: bytes): + """Render Gemtext data in the content pad.""" + elements = parse_gemtext(gemtext) + self.metalines = format_elements(elements, 80) + self.links = { + meta["link_id"]: meta["url"] + for meta, _ in self.metalines + if "link_id" in meta and "url" in meta + } + + self.content_pad.clear() + h, w = render_lines(self.metalines, self.content_pad, Screen.MAX_COLS) + self.content_pad_dim = (h, w) + self.current_line = 0 + self.current_column = 0 + self.refresh_content() + + def focus_command(self, command_char, validator=None, prefix=""): + """Give user focus to the command bar. + + Show the command char and give focus to the command textbox. The + validator function is passed to the textbox. + + Arguments: + - command_char: char to display before the command line. + - validator: function to use to validate the input chars. + - prefix: string to insert before the cursor in the command line. + + Returns: + User input as string. The string will be empty if the validator raised + an EscapeInterrupt. + """ + assert self.command_window is not None + self.command_window.clear() + self.command_window.refresh() + self.command_textbox = curses.textpad.Textbox(self.command_window) + self.command_window.addstr(command_char + prefix) + curses.curs_set(1) + try: + command = self.command_textbox.edit(validator)[1:] + except EscapeCommandInterrupt: + command = "" + except TerminateCommandInterrupt as exc: + command = exc.command + curses.curs_set(0) + self.clear_command() + return command + + def gather_current_command(self): + """Return the string currently written by the user in command line.""" + return self.command_textbox.gather()[1:].rstrip() + + def clear_command(self): + """Clear the command line """ + self.command_window.clear() + self.command_window.refresh() + self.screen.delch(self.h - 1, 0) + self.screen.refresh() + + def input_common_command(self): + """Focus command line to type a regular command. Currently useless.""" + return self.focus_command(":", self.validate_common_char) + + def validate_common_char(self, ch: int): + """Generic input validator, handles a few more cases than default. + + This validator can be used as a default validator as it handles, on top + of the Textbox defaults: + - Erasing the first command char, i.e. clearing the line, cancels the + command input. + - Pressing ESC also cancels the input. + + This validator can be safely called at the beginning of other validators + to handle the keys above. + """ + if ch == curses.KEY_BACKSPACE: # Cancel input if all line is cleaned. + text = self.gather_current_command() + if len(text) == 0: + raise EscapeCommandInterrupt() + elif ch == curses.ascii.ESC: # Could be ESC or ALT + self.screen.nodelay(True) + ch = self.screen.getch() + if ch == -1: + raise EscapeCommandInterrupt() + self.screen.nodelay(False) + return ch + + def handle_digit_input(self, init_char: int): + """Handle a initial digit input by the user. + + When a digit key is pressed, the user intents to visit a link (or + dropped something on the numpad). To reduce the number of key types + needed, Bebop uses the following algorithm: + - If the highest link ID on the page is less than 10, pressing the key + takes you to the link. + - If it's higher than 10, the user either inputs as many digits required + to disambiguate the link ID, or press enter to validate her input. + + Examples + - I have 3 links. Pressing "2" takes me to link 2. + - I have 15 links. Pressing "3" and Enter takes me to link 2. + - I have 15 links. Pressing "1" and "2" takes me to link 12 (no + ambiguity, so Enter is not required). + - I have 456 links. Pressing "1", "2" and Enter takes me to link 12. + - I have 456 links. Pressing "1", "2" and "6" takes me to link 126 (no + ambiguity as well). + """ + digit = init_char & 0xf + num_links = len(self.links) + if num_links < 10: + self.open_link(digit) + return + required_digits = 0 + while num_links: + required_digits += 1 + num_links //= 10 + link_input = self.focus_command( + "~", + validator=lambda ch: self._validate_link_digit(ch, required_digits), + prefix=chr(init_char), + ) + try: + link_id = int(link_input) + except ValueError: + self.set_status_error("invalid link ID") + return + self.open_link(link_id) + + def _validate_link_digit(self, ch: int, required_digits: int): + """Handle input chars to be used as link ID.""" + # Handle common chars. + ch = self.validate_common_char(ch) + # Only accept digits. If we reach the amount of required digits, open + # link now and leave command line. Else just process it. + if curses.ascii.isdigit(ch): + digits = self.gather_current_command() + if len(digits) + 1 == required_digits: + raise TerminateCommandInterrupt(digits + chr(ch)) + return ch + # If not a digit but a printable character, ignore it. + if curses.ascii.isprint(ch): + return 0 + # Everything else could be a control character and should be processed. + return ch + + def disambiguate_link_id(self, digits: str, max_digits: int): + if len(digits) == max_digits: + return digits + + + def open_link(self, link_id: int): + """Open the link with this link ID.""" + if not link_id in self.links: + self.set_status_error(f"unknown link ID {link_id}.") + return + self.open_url(self.links[link_id]) + + def handle_mouse(self, mouse_id: int, x: int, y: int, z: int, bstate: int): + """Handle mouse events. + + Right now, only vertical scrolling is handled. + """ + if bstate & ButtonState.SCROLL_UP: + self.scroll_content(3, scroll_up=True) + elif bstate & ButtonState.SCROLL_DOWN: + self.scroll_content(3) + + def handle_resize(self): + """Try to not make everything collapse on resizes.""" + # Refresh the whole screen before changing windows to avoid random + # blank screens. + self.screen.refresh() + old_dim = self.dim + self.dim = self.screen.getmaxyx() + # Avoid work if the resizing does not impact us. + if self.dim == old_dim: + return + # Resize windows to fit the new dimensions. Content pad will be updated + # on its own at the end of the function. + self.status_window.resize(*self.line_dim) + self.command_window.resize(*self.line_dim) + # Move the windows to their new position if that's still possible. + if self.status_window_pos[0] >= 0: + self.status_window.mvwin(*self.status_window_pos) + if self.command_window_pos[0] >= 0: + self.command_window.mvwin(*self.command_window_pos) + # If the content pad does not fit its whole place, we have to clean the + # gap between it and the status line. Refresh all screen. + if self.content_pad_dim[0] < self.h - 2: + self.screen.clear() + self.screen.refresh() + self.refresh_windows() + + +class EscapeCommandInterrupt(Exception): + """Signal that ESC has been pressed during command line.""" + pass + + +class TerminateCommandInterrupt(Exception): + """Signal that validation ended command line input early. Use `command`.""" + + def __init__(self, command: str, *args, **kwargs): + super().__init__(*args, **kwargs) + self.command = command diff --git a/bebop/tests/__init__.py b/bebop/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bebop/tests/test_navigation.py b/bebop/tests/test_navigation.py new file mode 100644 index 0000000..9eb24de --- /dev/null +++ b/bebop/tests/test_navigation.py @@ -0,0 +1,30 @@ +import unittest + +from ..navigation import join_url, parse_url + + +class TestNavigation(unittest.TestCase): + + def test_parse_url(self): + res = parse_url("gemini://dece.space/parse-me.gmi") + self.assertEqual(res.scheme, "gemini") + self.assertEqual(res.netloc, "dece.space") + self.assertEqual(res.path, "/parse-me.gmi") + + res_netloc = parse_url("//dece.space/parse-me.gmi") + self.assertEqual(res, res_netloc) + + res = parse_url("dece.space/parse-me.gmi", absolute=True) + self.assertEqual(res.scheme, "gemini") + self.assertEqual(res.netloc, "dece.space") + self.assertEqual(res.path, "/parse-me.gmi") + + def test_join_url(self): + url = join_url("gemini://dece.space", "some-file.gmi") + self.assertEqual(url, "gemini://dece.space/some-file.gmi") + url = join_url("gemini://dece.space", "some-file.gmi") + self.assertEqual(url, "gemini://dece.space/some-file.gmi") + url = join_url("gemini://dece.space/dir1/file.gmi", "other-file.gmi") + self.assertEqual(url, "gemini://dece.space/dir1/other-file.gmi") + url = join_url("gemini://dece.space/dir1/file.gmi", "../top-level.gmi") + self.assertEqual(url, "gemini://dece.space/top-level.gmi") diff --git a/bebop/tests/test_protocol.py b/bebop/tests/test_protocol.py new file mode 100644 index 0000000..9fe8dc8 --- /dev/null +++ b/bebop/tests/test_protocol.py @@ -0,0 +1,10 @@ +import unittest + +from ..protocol import parse_gemini_url + + +class TestGemini(unittest.TestCase): + + def test_parse_url(self): + r1 = parse_gemini_url("gemini://dece.space") + self.assertDictEqual(r1, {"host": "dece.space", "path": ""}) diff --git a/bebop/tests/test_rendering.py b/bebop/tests/test_rendering.py new file mode 100644 index 0000000..66add85 --- /dev/null +++ b/bebop/tests/test_rendering.py @@ -0,0 +1,41 @@ +import unittest + +from ..rendering import _explode_words, _find_next_sep, wrap_words + + +class TestRenderer(unittest.TestCase): + + def test_wrap_words(self): + t = "wrap me wrap me youcantwrapthisonewithoutforce bla bla bla bla" + lines = wrap_words(t, 10) + expected = [ + "wrap me ", + "wrap me ", + "youcantwr-", + "apthisone-", + "withoutfo-", + "rce bla ", + "bla bla ", + "bla", + ] + self.assertListEqual(lines, expected) + + def test_explode_words(self): + t = "unsplittableword word-dash tabatmyleft dot.sep" + words = _explode_words(t) + expected = [ + "unsplittableword", " ", "word-", "dash", " ", "tabatmyleft", + " ", "dot.sep" + ] + self.assertListEqual(words, expected) + + def test_find_next_sep(self): + t = "unsplittableword word-dash" + sep, index = _find_next_sep(t) + self.assertEqual((sep, index), (" ", 16)) + t = t[17:] + sep, index = _find_next_sep(t) + self.assertEqual((sep, index), ("-", 4)) + t = t[5:] + sep, index = _find_next_sep(t) + self.assertEqual((sep, index), ("", 0)) diff --git a/bebop/tofu.py b/bebop/tofu.py new file mode 100644 index 0000000..a89e715 --- /dev/null +++ b/bebop/tofu.py @@ -0,0 +1,95 @@ +import datetime +import hashlib +import re +from enum import Enum + +import asn1crypto.x509 + +STASH_LINE_RE = re.compile(r"(\S+) (\S+) (\S+) (\d+)") + + +def load_cert_stash(stash_path): + """Load the certificate stash from the file, or None on error. + + The stash is a dict with host names as keys and tuples as values. Tuples + have four elements: + - the fingerprint algorithm (only SHA-512 is supported), + - the fingerprint as an hexstring, + - the timestamp of the expiration date, + - a boolean that is True when the stash is loaded from a file, i.e. always + true for entries loaded in this function, but should be false when it + concerns a certificate temporary trusted for the session only; this flag + is used to decide whether to save the certificate in the stash at exit. + """ + stash = {} + try: + with open(stash_path, "rt") as stash_file: + for line in stash_file: + match = STASH_LINE_RE.match(line) + if not match: + continue + name, algo, fingerprint, timestamp = match.groups() + stash[name] = (algo, fingerprint, timestamp, True) + except (OSError, ValueError): + return None + return stash + + +class CertStatus(Enum): + """Value returned by validate_cert.""" + # Cert is valid: proceed. + VALID = 0 # Known and valid. + VALID_NEW = 7 # New and valid. + # Cert is unusable or wrong: abort. + ERROR = 1 # General error. + WRONG_FINGERPRINT = 2 # Fingerprint in the stash is different. + # Cert has some issues: ask to proceed. + NOT_VALID_YET = 3 # not-before date invalid. + EXPIRED = 4 # not-after date invalid. + BAD_DOMAIN = 5 # Host name is not in cert's valid domains. + + +CERT_STATUS_INVALID = ( + CertStatus.NOT_VALID_YET, + CertStatus.EXPIRED, + CertStatus.BAD_DOMAIN, +) + + +def validate_cert(der, hostname, cert_stash): + """Return a tuple (CertStatus, Certificate) for this certificate.""" + if der is None: + return CertStatus.ERROR, None + try: + cert = asn1crypto.x509.Certificate.load(der) + except ValueError: + return CertStatus.ERROR, None + + # Check for sane parameters. + now = datetime.datetime.now(tz=datetime.timezone.utc) + if now < cert.not_valid_before: + return CertStatus.NOT_VALID_YET, cert + if now > cert.not_valid_after: + return CertStatus.EXPIRED, cert + if hostname not in cert.valid_domains: + return CertStatus.BAD_DOMAIN, cert + + # Check the entire certificate fingerprint. + cert_hash = hashlib.sha512(der).hexdigest() + if hostname in cert_stash: + _, fingerprint, timestamp, _ = cert_stash[hostname] + if timestamp >= now.timestamp(): + if cert_hash != fingerprint: + return CertStatus.WRONG_FINGERPRINT, cert + else: + # Disregard expired fingerprints. + pass + return CertStatus.VALID, cert + + # The certificate is unknown and valid. + return CertStatus.VALID_NEW, cert + + +def trust(cert_stash, hostname, algo, fingerprint, timestamp, + trust_always=False): + cert_stash[hostname] = (algo, fingerprint, timestamp, trust_always) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..35a704e --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +asn1crypto