Compare commits
No commits in common. "1fddf4c2b250a73015c5ea0c0b3b29c8a3584fb5" and "65740487635a7c0c72a07f7336080af9356e6c3c" have entirely different histories.
1fddf4c2b2
...
6574048763
|
@ -12,13 +12,13 @@ TODO DONE
|
|||
view/edit sources
|
||||
downloads
|
||||
configuration
|
||||
help page
|
||||
TOFU
|
||||
open last download
|
||||
actual TOFU
|
||||
home page
|
||||
media files
|
||||
view history
|
||||
identity management
|
||||
help page for keybinds
|
||||
--------------------------------------------------------------------------------
|
||||
BACKLOG
|
||||
click on links to open them
|
||||
|
|
10
README.md
10
README.md
|
@ -29,13 +29,11 @@ Why use Bebop instead of something else?
|
|||
|
||||
### Lightweight
|
||||
|
||||
It does not use any external dependencies. Everything including NCurses or TLS
|
||||
is done using Python's standard library.
|
||||
It only uses a single dependency, [asn1crypto][asn1crypto], to delegate
|
||||
parsing certificates. Everything else including NCurses or TLS is done using
|
||||
Python's standard library.
|
||||
|
||||
### Nice keybinds
|
||||
|
||||
A lot of keybinds are defined, and Vim users should get quickly familiar with
|
||||
them. Find them in the help page by pressing `?`.
|
||||
[asn1crypto]: https://github.com/wbond/asn1crypto
|
||||
|
||||
### Fun
|
||||
|
||||
|
|
|
@ -2,8 +2,8 @@ import argparse
|
|||
|
||||
from bebop.browser.browser import Browser
|
||||
from bebop.config import load_config
|
||||
from bebop.fs import ensure_bebop_files_exist, get_config_path
|
||||
from bebop.tofu import get_cert_stash_path, load_cert_stash, save_cert_stash
|
||||
from bebop.fs import get_config_path, get_user_data_path
|
||||
from bebop.tofu import load_cert_stash, save_cert_stash
|
||||
|
||||
|
||||
def main():
|
||||
|
@ -19,9 +19,11 @@ def main():
|
|||
config_path = get_config_path()
|
||||
config = load_config(config_path)
|
||||
|
||||
ensure_bebop_files_exist()
|
||||
user_data_path = get_user_data_path()
|
||||
if not user_data_path.exists():
|
||||
user_data_path.mkdir()
|
||||
|
||||
cert_stash_path = get_cert_stash_path()
|
||||
cert_stash_path = user_data_path / "known_hosts.txt"
|
||||
cert_stash = load_cert_stash(cert_stash_path) or {}
|
||||
try:
|
||||
Browser(config, cert_stash).run(start_url=start_url)
|
||||
|
|
|
@ -13,7 +13,6 @@ from bebop.bookmarks import (
|
|||
from bebop.colors import ColorPair, init_colors
|
||||
from bebop.command_line import CommandLine
|
||||
from bebop.external import open_external_program
|
||||
from bebop.help import HELP_PAGE
|
||||
from bebop.history import History
|
||||
from bebop.links import Links
|
||||
from bebop.mouse import ButtonState
|
||||
|
@ -24,30 +23,11 @@ from bebop.page_pad import PagePad
|
|||
|
||||
|
||||
class Browser:
|
||||
"""Manage the events, inputs and rendering.
|
||||
|
||||
Attributes:
|
||||
- config: config dict passed to the browser.
|
||||
- stash: certificate stash passed to the browser.
|
||||
- screen: curses stdscr.
|
||||
- dim: current screen dimensions.
|
||||
- page_pad: curses pad containing the current page view.
|
||||
- status_line: curses window used to report current status.
|
||||
- command_line: a CommandLine object for the user to interact with.
|
||||
- running: the browser will continue running while this is true.
|
||||
- status_data: 3-uple of status text, color pair and attributes of the
|
||||
status line, used to reset status after an error.
|
||||
- history: an History object.
|
||||
- cache: a dict containing cached pages
|
||||
- special_pages: a dict containing page names used with "bebop" scheme;
|
||||
values are dicts as well: the "open" key maps to a callable to use when
|
||||
the page is accessed, and the optional "source" key maps to callable
|
||||
returning the page source path.
|
||||
"""
|
||||
"""Manage the events, inputs and rendering."""
|
||||
|
||||
def __init__(self, config, cert_stash):
|
||||
self.config = config
|
||||
self.stash = cert_stash
|
||||
self.stash = cert_stash or {}
|
||||
self.screen = None
|
||||
self.dim = (0, 0)
|
||||
self.page_pad = None
|
||||
|
@ -57,7 +37,6 @@ class Browser:
|
|||
self.status_data = ("", 0, 0)
|
||||
self.history = History()
|
||||
self.cache = {}
|
||||
self.special_pages = self.setup_special_pages()
|
||||
self._current_url = ""
|
||||
|
||||
@property
|
||||
|
@ -79,18 +58,6 @@ class Browser:
|
|||
self._current_url = url
|
||||
self.set_status(url)
|
||||
|
||||
def setup_special_pages(self):
|
||||
"""Return a dict with the special pages functions."""
|
||||
return {
|
||||
"bookmarks": {
|
||||
"open": self.open_bookmarks,
|
||||
"source": lambda: str(get_bookmarks_path())
|
||||
},
|
||||
"help": {
|
||||
"open": self.open_help,
|
||||
},
|
||||
}
|
||||
|
||||
def run(self, *args, **kwargs):
|
||||
"""Use curses' wrapper around _run."""
|
||||
os.environ.setdefault("ESCDELAY", "25")
|
||||
|
@ -132,9 +99,7 @@ class Browser:
|
|||
|
||||
def handle_inputs(self):
|
||||
char = self.screen.getch()
|
||||
if char == ord("?"):
|
||||
self.open_help()
|
||||
elif char == ord(":"):
|
||||
if char == ord(":"):
|
||||
self.quick_command("")
|
||||
elif char == ord("r"):
|
||||
self.reload_page()
|
||||
|
@ -253,11 +218,11 @@ class Browser:
|
|||
|
||||
def quick_command(self, command):
|
||||
"""Shortcut method to take user input with a prefixed command string."""
|
||||
prefix = command + " " if command else ""
|
||||
text = self.command_line.focus(CommandLine.CHAR_COMMAND, prefix=prefix)
|
||||
if not text:
|
||||
prefix = f"{command} " if command else ""
|
||||
user_input = self.command_line.focus(":", prefix=prefix)
|
||||
if not user_input:
|
||||
return
|
||||
self.process_command(text)
|
||||
self.process_command(user_input)
|
||||
|
||||
def process_command(self, command_text: str):
|
||||
"""Handle a client command."""
|
||||
|
@ -272,9 +237,6 @@ class Browser:
|
|||
return
|
||||
if command in ("o", "open"):
|
||||
self.open_url(words[1], assume_absolute=True)
|
||||
elif command == "forget-certificate":
|
||||
from bebop.browser.gemini import forget_certificate
|
||||
forget_certificate(self, words[1])
|
||||
|
||||
def open_url(self, url, base_url=None, redirects=0, assume_absolute=False,
|
||||
history=True, use_cache=True):
|
||||
|
@ -325,11 +287,8 @@ class Browser:
|
|||
from bebop.browser.file import open_file
|
||||
open_file(self, parts.path, history=history)
|
||||
elif parts.scheme == "bebop":
|
||||
special_page = self.special_pages.get(parts.netloc)
|
||||
if special_page:
|
||||
special_page["open"]()
|
||||
else:
|
||||
self.set_status_error("Unknown page.")
|
||||
if parts.netloc == "bookmarks":
|
||||
self.open_bookmarks()
|
||||
else:
|
||||
self.set_status_error(f"Protocol {parts.scheme} not supported.")
|
||||
|
||||
|
@ -483,10 +442,7 @@ class Browser:
|
|||
return
|
||||
self.set_status("Bookmark title?")
|
||||
current_title = self.page_pad.current_page.title or ""
|
||||
title = self.command_line.focus(
|
||||
CommandLine.CHAR_TEXT,
|
||||
prefix=current_title
|
||||
)
|
||||
title = self.command_line.focus(">", prefix=current_title)
|
||||
if title:
|
||||
title = title.strip()
|
||||
if title:
|
||||
|
@ -502,13 +458,11 @@ class Browser:
|
|||
directly from their location on disk.
|
||||
"""
|
||||
delete_source_after = False
|
||||
if self.current_url.startswith("bebop://"):
|
||||
page_name = self.current_url[len("bebop://"):]
|
||||
special_pages_functions = self.special_pages.get(page_name)
|
||||
if not special_pages_functions:
|
||||
return
|
||||
get_source = special_pages_functions.get("source")
|
||||
source_filename = get_source() if get_source else None
|
||||
special_pages = {
|
||||
"bebop://bookmarks": str(get_bookmarks_path())
|
||||
}
|
||||
if self.current_url in special_pages:
|
||||
source_filename = special_pages[self.current_url]
|
||||
else:
|
||||
if not self.page_pad.current_page:
|
||||
return
|
||||
|
@ -518,21 +472,8 @@ class Browser:
|
|||
source_filename = source_file.name
|
||||
delete_source_after = True
|
||||
|
||||
if not source_filename:
|
||||
return
|
||||
|
||||
command = self.config["source_editor"] + [source_filename]
|
||||
open_external_program(command)
|
||||
if delete_source_after:
|
||||
os.unlink(source_filename)
|
||||
self.refresh_windows()
|
||||
|
||||
def open_help(self):
|
||||
"""Show the help page."""
|
||||
self.load_page(Page.from_gemtext(HELP_PAGE, self.config["text_width"]))
|
||||
self.current_url = "bebop://help"
|
||||
|
||||
def prompt(self, text, keys):
|
||||
"""Display the text and allow it to type one of the given keys."""
|
||||
self.set_status(text)
|
||||
return self.command_line.prompt_key(keys)
|
||||
|
|
|
@ -3,12 +3,10 @@
|
|||
from pathlib import Path
|
||||
|
||||
from bebop.browser.browser import Browser
|
||||
from bebop.command_line import CommandLine
|
||||
from bebop.fs import get_downloads_path
|
||||
from bebop.navigation import set_parameter
|
||||
from bebop.page import Page
|
||||
from bebop.protocol import Request, Response
|
||||
from bebop.tofu import trust_fingerprint, untrust_fingerprint, WRONG_FP_ALERT
|
||||
|
||||
|
||||
MAX_URL_LEN = 1024
|
||||
|
@ -18,30 +16,10 @@ def open_gemini_url(browser: Browser, url, redirects=0, history=True,
|
|||
use_cache=True):
|
||||
"""Open a Gemini URL and set the formatted response as content.
|
||||
|
||||
While the specification is not set in stone, every client takes a slightly
|
||||
different approach to enforcing TOFU. Read the `Request.connect` docs to
|
||||
find about cases where connection is aborted without asking the user. What
|
||||
interests us here is what happens when the user should decide herself? This
|
||||
happens in several cases, matching the request possible states. Here is
|
||||
what Bebop do (or want to do):
|
||||
|
||||
- STATE_INVALID_CERT: the certificate has non-fatal issues; we may
|
||||
present the user the problems found and let her decide whether to trust
|
||||
temporarily the certificate or not BUT we currently do not parse the
|
||||
certificate's fields, so this state is never used.
|
||||
- STATE_UNKNOWN_CERT: the certificate is valid but has not been seen before;
|
||||
as we're doing TOFU here, we could automatically trust it or let the user
|
||||
choose. For simplicity, we always trust it permanently.
|
||||
|
||||
Attributes:
|
||||
- browser: Browser object making the request.
|
||||
- url: a valid URL with Gemini scheme to open.
|
||||
- redirects: current amount of redirections done to open the initial URL.
|
||||
- history: if true, save the final URL to history.
|
||||
- use_cache: if true, look up if the page is cached before requesting it.
|
||||
After initiating the connection, TODO
|
||||
"""
|
||||
if len(url) >= MAX_URL_LEN:
|
||||
browser.set_status_error("Request URL too long.")
|
||||
browser.set_status_error(f"Request URL too long.")
|
||||
return
|
||||
|
||||
browser.set_status(f"Loading {url}")
|
||||
|
@ -61,10 +39,10 @@ def open_gemini_url(browser: Browser, url, redirects=0, history=True,
|
|||
if req.state == Request.STATE_ERROR_CERT:
|
||||
error = f"Certificate was missing or corrupt ({url})."
|
||||
elif req.state == Request.STATE_UNTRUSTED_CERT:
|
||||
_handle_untrusted_cert(browser, req)
|
||||
error = f"Certificate has been changed ({url})."
|
||||
# TODO propose the user ways to handle this.
|
||||
elif req.state == Request.STATE_CONNECTION_FAILED:
|
||||
error_details = ": " + req.error if req.error else "."
|
||||
error_details = f": {req.error}" if req.error else "."
|
||||
error = f"Connection failed ({url})" + error_details
|
||||
else:
|
||||
error = f"Connection failed ({url})."
|
||||
|
@ -72,18 +50,13 @@ def open_gemini_url(browser: Browser, url, redirects=0, history=True,
|
|||
return
|
||||
|
||||
if req.state == Request.STATE_INVALID_CERT:
|
||||
# TODO propose abort / temp trust
|
||||
pass
|
||||
elif req.state == Request.STATE_UNKNOWN_CERT:
|
||||
# Certificate is valid but unknown: trust it permanently.
|
||||
hostname = req.hostname
|
||||
fingerprint = req.cert_validation["hash"]
|
||||
trust_fingerprint(
|
||||
browser.stash,
|
||||
hostname,
|
||||
"SHA-512",
|
||||
fingerprint,
|
||||
trust_always=True
|
||||
)
|
||||
# TODO propose abort / temp trust / perm trust
|
||||
pass
|
||||
else:
|
||||
pass # TODO
|
||||
|
||||
data = req.proceed()
|
||||
if not data:
|
||||
|
@ -94,47 +67,21 @@ def open_gemini_url(browser: Browser, url, redirects=0, history=True,
|
|||
browser.set_status_error(f"Server response parsing failed ({url}).")
|
||||
return
|
||||
|
||||
_handle_response(browser, response, url, redirects, history)
|
||||
|
||||
|
||||
def _handle_untrusted_cert(browser: Browser, request: Request):
|
||||
"""Handle a mismatch between known & server fingerprints.
|
||||
|
||||
This function formats an alert page to explain to the user what the hell is
|
||||
going on and displays it.
|
||||
"""
|
||||
remote_fp = request.cert_validation["hash"]
|
||||
local_fp = request.cert_validation["saved_hash"]
|
||||
alert_page_source = WRONG_FP_ALERT.format(
|
||||
hostname=request.hostname,
|
||||
local_fp=local_fp,
|
||||
remote_fp=remote_fp,
|
||||
)
|
||||
alert_page = Page.from_gemtext(
|
||||
alert_page_source,
|
||||
browser.config["text_width"]
|
||||
)
|
||||
browser.load_page(alert_page)
|
||||
|
||||
|
||||
def _handle_response(browser: Browser, response: Response, url: str,
|
||||
redirects: int, history: bool):
|
||||
"""Handle a response from a Gemini server."""
|
||||
if response.code == 20:
|
||||
_handle_successful_response(browser, response, url, history)
|
||||
handle_response_content(browser, url, response, history)
|
||||
elif response.generic_code == 30 and response.meta:
|
||||
browser.open_url(response.meta, base_url=url, redirects=redirects + 1)
|
||||
elif response.generic_code in (40, 50):
|
||||
error = f"Server error: {response.meta or Response.code.name}"
|
||||
browser.set_status_error(error)
|
||||
elif response.generic_code == 10:
|
||||
_handle_input_request(browser, url, response.meta)
|
||||
handle_input_request(browser, url, response.meta)
|
||||
else:
|
||||
error = f"Unhandled response code {response.code}"
|
||||
browser.set_status_error(error)
|
||||
|
||||
|
||||
def _handle_successful_response(browser: Browser, response: Response, url: str,
|
||||
def handle_response_content(browser: Browser, url: str, response: Response,
|
||||
history: bool):
|
||||
"""Handle a successful response content from a Gemini server.
|
||||
|
||||
|
@ -169,7 +116,7 @@ def _handle_successful_response(browser: Browser, response: Response, url: str,
|
|||
text = response.content.decode("utf-8", errors="replace")
|
||||
page = Page.from_text(text)
|
||||
else:
|
||||
filepath = _get_download_path(url)
|
||||
filepath = get_download_path(url)
|
||||
|
||||
if page:
|
||||
browser.load_page(page)
|
||||
|
@ -190,7 +137,7 @@ def _handle_successful_response(browser: Browser, response: Response, url: str,
|
|||
browser.set_status_error(error)
|
||||
|
||||
|
||||
def _get_download_path(url: str) -> Path:
|
||||
def get_download_path(url: str) -> Path:
|
||||
"""Try to find the best download file path possible from this URL."""
|
||||
download_dir = get_downloads_path()
|
||||
url_parts = url.rsplit("/", maxsplit=1)
|
||||
|
@ -202,26 +149,14 @@ def _get_download_path(url: str) -> Path:
|
|||
return download_dir / filename
|
||||
|
||||
|
||||
def _handle_input_request(browser: Browser, from_url: str, message: str =None):
|
||||
def handle_input_request(browser: Browser, from_url: str, message: str =None):
|
||||
"""Focus command-line to pass input to the server."""
|
||||
if message:
|
||||
browser.set_status(f"Input needed: {message}")
|
||||
else:
|
||||
browser.set_status("Input needed:")
|
||||
user_input = browser.command_line.focus(CommandLine.CHAR_TEXT)
|
||||
user_input = browser.command_line.focus("?")
|
||||
if not user_input:
|
||||
return
|
||||
url = set_parameter(from_url, user_input)
|
||||
open_gemini_url(browser, url)
|
||||
|
||||
|
||||
def forget_certificate(browser: Browser, hostname: str):
|
||||
"""Remove the fingerprint associated to this hostname for the cert stash."""
|
||||
key = browser.prompt(f"Remove fingerprint from {hostname}? [y/N]", "ynN")
|
||||
if key != "y":
|
||||
browser.reset_status()
|
||||
return
|
||||
if untrust_fingerprint(browser.stash, hostname):
|
||||
browser.set_status(f"Known certificate for {hostname} removed.")
|
||||
else:
|
||||
browser.set_status_error(f"Known certificate for {hostname} not found.")
|
||||
|
|
|
@ -17,35 +17,18 @@ class CommandLine:
|
|||
the window's right border when writing more content than the width allows.
|
||||
Therefore I just added the M-e keybind to call an external editor and use
|
||||
its content as result.
|
||||
|
||||
Attributes:
|
||||
- window: curses window to use for the command line and Textbox.
|
||||
- editor_command: external command to use to edit content externally.
|
||||
- textbox: Textbox object handling user input.
|
||||
"""
|
||||
|
||||
CHAR_COMMAND = ":"
|
||||
CHAR_DIGIT = "&"
|
||||
CHAR_TEXT = ">"
|
||||
|
||||
def __init__(self, window, editor_command):
|
||||
self.window = window
|
||||
self.editor_command = editor_command
|
||||
self.textbox = curses.textpad.Textbox(self.window)
|
||||
self.textbox = None
|
||||
|
||||
def clear(self):
|
||||
"""Clear command-line contents."""
|
||||
self.window.clear()
|
||||
self.window.refresh()
|
||||
|
||||
def gather(self):
|
||||
"""Return the string currently written by the user in command line.
|
||||
|
||||
This doesn't count the command char used, but it includes then prefix.
|
||||
Trailing whitespace is trimmed.
|
||||
"""
|
||||
return self.textbox.gather()[1:].rstrip()
|
||||
|
||||
def focus(self, command_char, validator=None, prefix=""):
|
||||
"""Give user focus to the command bar.
|
||||
|
||||
|
@ -63,13 +46,13 @@ class CommandLine:
|
|||
User input as string. The string will be empty if the validator raised
|
||||
an EscapeInterrupt.
|
||||
"""
|
||||
validator = validator or self._validate_common_input
|
||||
self.window.clear()
|
||||
self.window.refresh()
|
||||
self.textbox = curses.textpad.Textbox(self.window)
|
||||
self.window.addstr(command_char + prefix)
|
||||
curses.curs_set(1)
|
||||
try:
|
||||
command = self.textbox.edit(validator)
|
||||
command = self.textbox.edit(validator or self.validate_common_input)
|
||||
except EscapeCommandInterrupt:
|
||||
command = ""
|
||||
except TerminateCommandInterrupt as exc:
|
||||
|
@ -80,7 +63,11 @@ class CommandLine:
|
|||
self.clear()
|
||||
return command
|
||||
|
||||
def _validate_common_input(self, ch: int):
|
||||
def gather(self):
|
||||
"""Return the string currently written by the user in command line."""
|
||||
return self.textbox.gather()[1:].rstrip()
|
||||
|
||||
def validate_common_input(self, ch: int):
|
||||
"""Generic input validator, handles a few more cases than default.
|
||||
|
||||
This validator can be used as a default validator as it handles, on top
|
||||
|
@ -149,8 +136,8 @@ class CommandLine:
|
|||
if len(candidates) == 1:
|
||||
return 0, candidates[0]
|
||||
# Else, focus the command line to let the user input more digits.
|
||||
validator = lambda ch: self._validate_link_digit(ch, links, max_digits)
|
||||
link_input = self.focus(CommandLine.CHAR_DIGIT, validator, digit)
|
||||
validator = lambda ch: self.validate_link_digit(ch, links, max_digits)
|
||||
link_input = self.focus("&", validator, digit)
|
||||
if not link_input:
|
||||
return 1, None
|
||||
try:
|
||||
|
@ -159,10 +146,10 @@ class CommandLine:
|
|||
return 2, f"Invalid link ID {link_input}."
|
||||
return 0, link_id
|
||||
|
||||
def _validate_link_digit(self, ch: int, links: Links, max_digits: int):
|
||||
def validate_link_digit(self, ch: int, links: Links, max_digits: int):
|
||||
"""Handle input chars to be used as link ID."""
|
||||
# Handle common chars.
|
||||
ch = self._validate_common_input(ch)
|
||||
ch = self.validate_common_input(ch)
|
||||
# Only accept digits. If we reach the amount of required digits, open
|
||||
# link now and leave command line. Else just process it.
|
||||
if curses.ascii.isdigit(ch):
|
||||
|
@ -198,25 +185,6 @@ class CommandLine:
|
|||
return
|
||||
raise TerminateCommandInterrupt(content)
|
||||
|
||||
def prompt_key(self, keys):
|
||||
"""Focus the command line and wait for the user """
|
||||
validator = lambda ch: self._validate_prompt(ch, keys)
|
||||
key = self.focus(CommandLine.CHAR_TEXT, validator)
|
||||
return key if key in keys else ""
|
||||
|
||||
def _validate_prompt(self, ch: int, keys):
|
||||
"""Handle input chars and raise a terminate interrupt on a valid key."""
|
||||
# Handle common keys.
|
||||
ch = self._validate_common_input(ch)
|
||||
try:
|
||||
char = chr(ch)
|
||||
except ValueError:
|
||||
pass
|
||||
else:
|
||||
if char in keys:
|
||||
raise TerminateCommandInterrupt(char)
|
||||
return 0
|
||||
|
||||
|
||||
class EscapeCommandInterrupt(Exception):
|
||||
"""Signal that ESC has been pressed during command line."""
|
||||
|
|
|
@ -45,11 +45,3 @@ def get_downloads_path() -> Path:
|
|||
if download_path:
|
||||
return Path(download_path)
|
||||
return Path.home()
|
||||
|
||||
|
||||
def ensure_bebop_files_exist():
|
||||
"""Ensure various Bebop's files or directories are present."""
|
||||
# Ensure the user data directory exists.
|
||||
user_data_path = get_user_data_path()
|
||||
if not user_data_path.exists():
|
||||
user_data_path.mkdir(parents=True)
|
||||
|
|
|
@ -1,38 +0,0 @@
|
|||
"""Help page. Currently only keybinds are shown as help."""
|
||||
|
||||
HELP_PAGE = """\
|
||||
# Bebop keybinds
|
||||
|
||||
Keybinds using the SHIFT key are written uppercase. Keybinds using the ALT (or \
|
||||
META) key are written using the "M-" prefix. Symbol keys are written as their \
|
||||
name, not the symbol itself.
|
||||
|
||||
``` list of bebop keybinds
|
||||
- colon: focus the command-line
|
||||
- r: reload page
|
||||
- h: scroll left a bit
|
||||
- j: scroll down a bit
|
||||
- k: scroll up a bit
|
||||
- l: scroll right a bit
|
||||
- H: scroll left a whole page
|
||||
- J: scroll down a whole page
|
||||
- K: scroll up a whole page
|
||||
- L: scroll right a whole page
|
||||
- M-h: scroll one column left
|
||||
- M-j: scroll one line down
|
||||
- M-k: scroll one line up
|
||||
- M-l: scroll one column right
|
||||
- circumflex: horizontally scroll back to the first column
|
||||
- gg: go to the top of the page
|
||||
- G: go to the bottom of the page
|
||||
- o: open an URL
|
||||
- p: go to the previous page
|
||||
- u: go to the parent page (up a level in URL)
|
||||
- U: go to the root page (root URL for the current domain)
|
||||
- b: open bookmarks
|
||||
- B: add current page to bookmarks
|
||||
- e: open the current page source in an editor
|
||||
- digits: go to the corresponding link ID
|
||||
- escape: reset status line text
|
||||
```
|
||||
"""
|
|
@ -8,13 +8,19 @@ from enum import IntEnum
|
|||
from typing import Optional
|
||||
|
||||
from bebop.mime import DEFAULT_MIME_TYPE, MimeType
|
||||
from bebop.tofu import CertStatus, validate_cert
|
||||
from bebop.tofu import CertStatus, CERT_STATUS_INVALID, validate_cert
|
||||
|
||||
|
||||
GEMINI_URL_RE = re.compile(r"gemini://(?P<host>[^/]+)(?P<path>.*)")
|
||||
LINE_TERM = b"\r\n"
|
||||
|
||||
|
||||
def parse_gemini_url(url):
|
||||
"""Return a dict containing the hostname and the request path, or None."""
|
||||
match = GEMINI_URL_RE.match(url)
|
||||
return match.groupdict() if match else None
|
||||
|
||||
|
||||
class Request:
|
||||
"""A Gemini request.
|
||||
|
||||
|
@ -24,21 +30,8 @@ class Request:
|
|||
sending the request header and receiving the response:
|
||||
|
||||
1. Instantiate a Request.
|
||||
2. `connect` opens the connection and aborts it or leaves the caller free to
|
||||
check stuff.
|
||||
2. `connect` opens the connection, leaves the caller free to check stuff.
|
||||
3. `proceed` or `abort` can be called.
|
||||
|
||||
Attributes:
|
||||
- url: URL to open.
|
||||
- cert_stash: certificate stash to use an possibly update.
|
||||
- state: request state.
|
||||
- hostname: hostname derived from url, stored when `connect` is called.
|
||||
- payload: bytes object of the payload request; build during `connect`, used
|
||||
during `proceed`.
|
||||
- ssock: TLS-wrapped socket.
|
||||
- cert_validation: validation results dict, set after certificate has been
|
||||
reviewed.
|
||||
- error: human-readable connection error, may be set during `connect`.
|
||||
"""
|
||||
|
||||
# Initial state, connection is not established yet.
|
||||
|
@ -62,69 +55,28 @@ class Request:
|
|||
self.url = url
|
||||
self.cert_stash = cert_stash
|
||||
self.state = Request.STATE_INIT
|
||||
self.hostname = ""
|
||||
self.payload = b""
|
||||
self.ssock = None
|
||||
self.cert_validation = None
|
||||
self.cert = None
|
||||
self.cert_status = None
|
||||
self.error = ""
|
||||
|
||||
def connect(self, timeout: int) -> bool:
|
||||
def connect(self, timeout):
|
||||
"""Connect to a Gemini server and return a RequestEventType.
|
||||
|
||||
Return True if the connection is established. The caller has to verify
|
||||
the request state and propose appropriate choices to the user if the
|
||||
certificate status is not CertStatus.VALID (Request.STATE_OK).
|
||||
|
||||
If connect returns False, the secure socket is aborted before return so
|
||||
there is no need to call `abort`. If connect returns True, it is up to the
|
||||
caller to decide whether to continue (call `proceed`) the connection or
|
||||
abort it (call `abort`).
|
||||
|
||||
The request `state` is updated to reflect the connection state after the
|
||||
function returns. The following list describes states related to
|
||||
connection failure (False returned):
|
||||
|
||||
- STATE_INVALID_URL: URL is not valid.
|
||||
- STATE_CONNECTION_FAILED: connection failed, either TCP timeout or
|
||||
local TLS failure. Additionally, the request `error` attribute is set
|
||||
to an error string describing the issue.
|
||||
|
||||
For all request states from now on, the `cert_validation` attribute is
|
||||
updated with the result of the certificate validation.
|
||||
|
||||
The following list describes states related to validation failure (False
|
||||
returned):
|
||||
|
||||
- STATE_ERROR_CERT: server certificate could not be validated at all.
|
||||
- STATE_UNTRUSTED_CERT: server certificate mismatched the known
|
||||
certificate for that hostname. The user should be presented with
|
||||
options to solve the matter.
|
||||
|
||||
For other states, the connection is not aborted (True returned):
|
||||
|
||||
- STATE_INVALID_CERT: the certificate has one or more issues, e.g.
|
||||
mismatching hostname or it is expired.
|
||||
- STATE_UNKNOWN_CERT: the certificate is valid but unknown.
|
||||
- STATE_OK: the certificate is valid and matches the known certificate
|
||||
of that hostname.
|
||||
|
||||
After this function returns, the request state cannot be STATE_INIT.
|
||||
|
||||
Additional notes:
|
||||
|
||||
- The DER hash is compared against the fingerprint for this hostname
|
||||
*and port*; the specification does not tell much about that, but we
|
||||
are slightly more restrictive here by adding the port in the equation.
|
||||
- The state STATE_INVALID_CERT is actually never used in Bebop because
|
||||
of the current tendency to ignore any certificate fields and only
|
||||
check the whole cert fingerprint. Here it is considered the same as a
|
||||
valid certificate.
|
||||
If connect returns False, the secure socket is aborted before return. If
|
||||
connect returns True, it is up to the caller to decide whether to
|
||||
continue (call proceed) the connection or abort it (call abort).
|
||||
"""
|
||||
url_parts = GEMINI_URL_RE.match(self.url)
|
||||
url_parts = parse_gemini_url(self.url)
|
||||
if not url_parts:
|
||||
self.state = Request.STATE_INVALID_URL
|
||||
return False
|
||||
hostname = url_parts.groupdict()["host"]
|
||||
hostname = url_parts["host"]
|
||||
if ":" in hostname:
|
||||
hostname, port = hostname.split(":", maxsplit=1)
|
||||
try:
|
||||
|
@ -134,7 +86,6 @@ class Request:
|
|||
return False
|
||||
else:
|
||||
port = 1965
|
||||
self.hostname = hostname
|
||||
|
||||
try:
|
||||
self.payload = self.url.encode()
|
||||
|
@ -154,26 +105,27 @@ class Request:
|
|||
try:
|
||||
self.ssock = context.wrap_socket(sock, server_hostname=hostname)
|
||||
except OSError as exc:
|
||||
sock.close()
|
||||
self.state = Request.STATE_CONNECTION_FAILED
|
||||
self.error = exc.strerror
|
||||
return False
|
||||
|
||||
der = self.ssock.getpeercert(binary_form=True)
|
||||
self.cert_validation = validate_cert(der, hostname, self.cert_stash)
|
||||
cert_status = self.cert_validation["status"]
|
||||
if cert_status == CertStatus.ERROR:
|
||||
self.cert_status, self.cert = \
|
||||
validate_cert(der, hostname, self.cert_stash)
|
||||
if self.cert_status == CertStatus.ERROR:
|
||||
self.abort()
|
||||
self.state = Request.STATE_ERROR_CERT
|
||||
return False
|
||||
if cert_status == CertStatus.WRONG_FINGERPRINT:
|
||||
if self.cert_status == CertStatus.WRONG_FINGERPRINT:
|
||||
self.abort()
|
||||
self.state = Request.STATE_UNTRUSTED_CERT
|
||||
return False
|
||||
|
||||
if cert_status == CertStatus.VALID_NEW:
|
||||
if self.cert_status in CERT_STATUS_INVALID:
|
||||
self.state = Request.STATE_INVALID_CERT
|
||||
elif self.cert_status == CertStatus.VALID_NEW:
|
||||
self.state = Request.STATE_UNKNOWN_CERT
|
||||
else: # self.cert_status in (VALID, VALID_NEW, INVALID_CERT)
|
||||
else: # self.cert_status == CertStatus.VALID
|
||||
self.state = Request.STATE_OK
|
||||
return True
|
||||
|
||||
|
@ -280,6 +232,6 @@ class Response:
|
|||
return response
|
||||
|
||||
@staticmethod
|
||||
def get_generic_code(code: int) -> int:
|
||||
def get_generic_code(code) -> int:
|
||||
"""Return the generic version (x0) of this code."""
|
||||
return code - (code % 10)
|
||||
|
|
10
bebop/tests/test_protocol.py
Normal file
10
bebop/tests/test_protocol.py
Normal file
|
@ -0,0 +1,10 @@
|
|||
import unittest
|
||||
|
||||
from ..protocol import parse_gemini_url
|
||||
|
||||
|
||||
class TestGemini(unittest.TestCase):
|
||||
|
||||
def test_parse_url(self):
|
||||
r1 = parse_gemini_url("gemini://dece.space")
|
||||
self.assertDictEqual(r1, {"host": "dece.space", "path": ""})
|
|
@ -1,9 +1,9 @@
|
|||
import unittest
|
||||
|
||||
from ..metalines import _explode_words, _find_next_sep, wrap_words
|
||||
from ..rendering import _explode_words, _find_next_sep, wrap_words
|
||||
|
||||
|
||||
class TestMetalines(unittest.TestCase):
|
||||
class TestRenderer(unittest.TestCase):
|
||||
|
||||
def test_wrap_words(self):
|
||||
t = "wrap me wrap me youcantwrapthisonewithoutforce bla bla bla bla"
|
149
bebop/tofu.py
149
bebop/tofu.py
|
@ -4,70 +4,19 @@ As of writing there is still some debate around it, so it is quite messy and
|
|||
requires more clarity both in specification and in our own implementation.
|
||||
"""
|
||||
|
||||
import datetime
|
||||
import hashlib
|
||||
import re
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from bebop.fs import get_user_data_path
|
||||
import asn1crypto.x509
|
||||
|
||||
|
||||
STASH_LINE_RE = re.compile(r"(\S+) (\S+) (\S+)")
|
||||
|
||||
WRONG_FP_ALERT = """\
|
||||
The request could not complete because the certificate presented by the server \
|
||||
does not match the certificate stored in the local stash.
|
||||
|
||||
``` details of the fingerprint mismatch
|
||||
Hostname: {hostname}
|
||||
Local fingerprint: {local_fp}
|
||||
Server fingerprint: {remote_fp}
|
||||
```
|
||||
|
||||
If you are sure this new certificate can be trusted, press ":" and type the \
|
||||
following command to remove the previous certificate from the local stash, \
|
||||
then retry your request:
|
||||
|
||||
``` command to use to forget about the previous certificate
|
||||
forget-certificate {hostname}
|
||||
```
|
||||
|
||||
You can also manually remove the certificate line from the known hosts file in \
|
||||
your user data directory.
|
||||
|
||||
## FAQ
|
||||
|
||||
### What is this mismatch about?
|
||||
|
||||
Gemini uses TOFU (Trust On First Use) to verify the identity of the server you \
|
||||
are visiting. It means that the first time you visited this capsule, it showed \
|
||||
you its unique ID, but this time the ID is different, so the trust is broken.
|
||||
|
||||
Capsule owners often tell in advance when they are about the use a new \
|
||||
certificate, but they may have forgotten or you may have missed it. Maybe the \
|
||||
old certificate expired and/or has been replaced for another reason (e.g. \
|
||||
using a far away expiration time, borking certificates during a migration, …)
|
||||
|
||||
### Am I being hacked?
|
||||
|
||||
Probably not, but if you are visiting a sensitive capsule, make sure you're \
|
||||
confident enough before trusting this new certificate.
|
||||
|
||||
### How to ensure this new certificate can be trusted?
|
||||
|
||||
Can you join the owner through mail or instant messaging? This is the simplest \
|
||||
way for you to make sure that the server is fine, and maybe alert the owner on \
|
||||
a problem on his server she did not notice.
|
||||
"""
|
||||
STASH_LINE_RE = re.compile(r"(\S+) (\S+) (\S+) (\d+)")
|
||||
|
||||
|
||||
def get_cert_stash_path() -> Path:
|
||||
"""Return the default certificate stash path."""
|
||||
return get_user_data_path() / "known_hosts.txt"
|
||||
|
||||
|
||||
def load_cert_stash(stash_path: Path) -> Optional[Dict]:
|
||||
def load_cert_stash(stash_path: Path):
|
||||
"""Load the certificate stash from the file, or None on error.
|
||||
|
||||
The stash is a dict with host names as keys and tuples as values. Tuples
|
||||
|
@ -87,8 +36,8 @@ def load_cert_stash(stash_path: Path) -> Optional[Dict]:
|
|||
match = STASH_LINE_RE.match(line)
|
||||
if not match:
|
||||
continue
|
||||
name, algo, fingerprint = match.groups()
|
||||
stash[name] = (algo, fingerprint, True)
|
||||
name, algo, fingerprint, timestamp = match.groups()
|
||||
stash[name] = (algo, fingerprint, timestamp, True)
|
||||
except (OSError, ValueError):
|
||||
return None
|
||||
return stash
|
||||
|
@ -98,67 +47,71 @@ def save_cert_stash(stash: dict, stash_path: Path):
|
|||
"""Save the certificate stash."""
|
||||
try:
|
||||
with open(stash_path, "wt") as stash_file:
|
||||
for name, entry in stash.items():
|
||||
algo, fingerprint, is_permanent = entry
|
||||
for name, entry in stash.values():
|
||||
algo, fingerprint, timestamp, is_permanent = entry
|
||||
if not is_permanent:
|
||||
continue
|
||||
entry_line = f"{name} {algo} {fingerprint}\n"
|
||||
entry_line = f"{name} {algo} {fingerprint} {timestamp}\n"
|
||||
stash_file.write(entry_line)
|
||||
except (OSError, ValueError) as exc:
|
||||
print(f"Failed to save certificate stash '{stash_path}': {exc}")
|
||||
except (OSError, ValueError):
|
||||
pass
|
||||
|
||||
|
||||
class CertStatus(Enum):
|
||||
"""Value returned by validate_cert."""
|
||||
# Cert is valid: proceed.
|
||||
VALID = 0 # Known and valid.
|
||||
VALID_NEW = 1 # New and valid.
|
||||
VALID_NEW = 7 # New and valid.
|
||||
# Cert is unusable or wrong: abort.
|
||||
ERROR = 2 # General error.
|
||||
WRONG_FINGERPRINT = 3 # Fingerprint in the stash is different.
|
||||
ERROR = 1 # General error.
|
||||
WRONG_FINGERPRINT = 2 # Fingerprint in the stash is different.
|
||||
# Cert has some issues: ask to proceed.
|
||||
NOT_VALID_YET = 3 # not-before date invalid.
|
||||
EXPIRED = 4 # not-after date invalid.
|
||||
BAD_DOMAIN = 5 # Host name is not in cert's valid domains.
|
||||
|
||||
|
||||
def validate_cert(der, hostname, cert_stash) -> Dict[str, Any]:
|
||||
"""Return a dict containing validation info for this certificate.
|
||||
CERT_STATUS_INVALID = (
|
||||
CertStatus.NOT_VALID_YET,
|
||||
CertStatus.EXPIRED,
|
||||
CertStatus.BAD_DOMAIN,
|
||||
)
|
||||
|
||||
Returns:
|
||||
The validation dict can contain two keys:
|
||||
- status: CertStatus, always present.
|
||||
- hash: DER hash to be used as certificate fingerprint, present if status is
|
||||
not CertStatus.ERROR.
|
||||
- saved_hash: fingerprint for this hostname in the local stash, present if
|
||||
status is CertStatus.WRONG_FINGERPRINT.
|
||||
"""
|
||||
|
||||
def validate_cert(der, hostname, cert_stash):
|
||||
"""Return a tuple (CertStatus, Certificate) for this certificate."""
|
||||
if der is None:
|
||||
return {"status": CertStatus.ERROR}
|
||||
return CertStatus.ERROR, None
|
||||
try:
|
||||
cert = asn1crypto.x509.Certificate.load(der)
|
||||
except ValueError:
|
||||
return CertStatus.ERROR, None
|
||||
|
||||
known = False
|
||||
# Check for sane parameters.
|
||||
now = datetime.datetime.now(tz=datetime.timezone.utc)
|
||||
if now < cert.not_valid_before:
|
||||
return CertStatus.NOT_VALID_YET, cert
|
||||
if now > cert.not_valid_after:
|
||||
return CertStatus.EXPIRED, cert
|
||||
if hostname not in cert.valid_domains:
|
||||
return CertStatus.BAD_DOMAIN, cert
|
||||
|
||||
# Check the entire certificate fingerprint.
|
||||
cert_hash = hashlib.sha512(der).hexdigest()
|
||||
result = {"hash": cert_hash} # type: Dict[str, Any]
|
||||
if hostname in cert_stash:
|
||||
_, fingerprint, _ = cert_stash[hostname]
|
||||
_, fingerprint, timestamp, _ = cert_stash[hostname]
|
||||
if timestamp >= now.timestamp():
|
||||
if cert_hash != fingerprint:
|
||||
result.update(
|
||||
status=CertStatus.WRONG_FINGERPRINT,
|
||||
saved_hash=fingerprint
|
||||
)
|
||||
return result
|
||||
known = True
|
||||
return CertStatus.WRONG_FINGERPRINT, cert
|
||||
else:
|
||||
# Disregard expired fingerprints.
|
||||
pass
|
||||
return CertStatus.VALID, cert
|
||||
|
||||
result.update(status=CertStatus.VALID if known else CertStatus.VALID_NEW)
|
||||
return result
|
||||
# The certificate is unknown and valid.
|
||||
return CertStatus.VALID_NEW, cert
|
||||
|
||||
|
||||
def trust_fingerprint(stash, hostname, algo, fingerprint, trust_always=False):
|
||||
"""Add a fingerprint entry to this stash."""
|
||||
stash[hostname] = (algo, fingerprint, trust_always)
|
||||
|
||||
|
||||
def untrust_fingerprint(stash, hostname):
|
||||
"""Remove a fingerprint entry from this stash; return True on deletion."""
|
||||
if hostname in stash:
|
||||
del stash[hostname]
|
||||
return True
|
||||
return False
|
||||
def trust(cert_stash, hostname, algo, fingerprint, timestamp,
|
||||
trust_always=False):
|
||||
cert_stash[hostname] = (algo, fingerprint, timestamp, trust_always)
|
||||
|
|
1
requirements.txt
Normal file
1
requirements.txt
Normal file
|
@ -0,0 +1 @@
|
|||
asn1crypto
|
Reference in a new issue