Compare commits

..

3 commits

Author SHA1 Message Date
dece 1fddf4c2b2 tofu: proper implementation 2021-04-19 02:04:18 +02:00
dece 80ec71f30b command_line: add a prompt function 2021-04-19 00:28:20 +02:00
dece 396391ea80 help: add help page 2021-04-18 16:13:41 +02:00
13 changed files with 432 additions and 146 deletions

View file

@ -12,13 +12,13 @@ TODO DONE
view/edit sources view/edit sources
downloads downloads
configuration configuration
help page
TOFU
open last download open last download
actual TOFU
home page home page
media files media files
view history view history
identity management identity management
help page for keybinds
-------------------------------------------------------------------------------- --------------------------------------------------------------------------------
BACKLOG BACKLOG
click on links to open them click on links to open them

View file

@ -29,11 +29,13 @@ Why use Bebop instead of something else?
### Lightweight ### Lightweight
It only uses a single dependency, [asn1crypto][asn1crypto], to delegate It does not use any external dependencies. Everything including NCurses or TLS
parsing certificates. Everything else including NCurses or TLS is done using is done using Python's standard library.
Python's standard library.
[asn1crypto]: https://github.com/wbond/asn1crypto ### Nice keybinds
A lot of keybinds are defined, and Vim users should get quickly familiar with
them. Find them in the help page by pressing `?`.
### Fun ### Fun

View file

@ -2,8 +2,8 @@ import argparse
from bebop.browser.browser import Browser from bebop.browser.browser import Browser
from bebop.config import load_config from bebop.config import load_config
from bebop.fs import get_config_path, get_user_data_path from bebop.fs import ensure_bebop_files_exist, get_config_path
from bebop.tofu import load_cert_stash, save_cert_stash from bebop.tofu import get_cert_stash_path, load_cert_stash, save_cert_stash
def main(): def main():
@ -19,11 +19,9 @@ def main():
config_path = get_config_path() config_path = get_config_path()
config = load_config(config_path) config = load_config(config_path)
user_data_path = get_user_data_path() ensure_bebop_files_exist()
if not user_data_path.exists():
user_data_path.mkdir()
cert_stash_path = user_data_path / "known_hosts.txt" cert_stash_path = get_cert_stash_path()
cert_stash = load_cert_stash(cert_stash_path) or {} cert_stash = load_cert_stash(cert_stash_path) or {}
try: try:
Browser(config, cert_stash).run(start_url=start_url) Browser(config, cert_stash).run(start_url=start_url)

View file

@ -13,6 +13,7 @@ from bebop.bookmarks import (
from bebop.colors import ColorPair, init_colors from bebop.colors import ColorPair, init_colors
from bebop.command_line import CommandLine from bebop.command_line import CommandLine
from bebop.external import open_external_program from bebop.external import open_external_program
from bebop.help import HELP_PAGE
from bebop.history import History from bebop.history import History
from bebop.links import Links from bebop.links import Links
from bebop.mouse import ButtonState from bebop.mouse import ButtonState
@ -23,11 +24,30 @@ from bebop.page_pad import PagePad
class Browser: class Browser:
"""Manage the events, inputs and rendering.""" """Manage the events, inputs and rendering.
Attributes:
- config: config dict passed to the browser.
- stash: certificate stash passed to the browser.
- screen: curses stdscr.
- dim: current screen dimensions.
- page_pad: curses pad containing the current page view.
- status_line: curses window used to report current status.
- command_line: a CommandLine object for the user to interact with.
- running: the browser will continue running while this is true.
- status_data: 3-uple of status text, color pair and attributes of the
status line, used to reset status after an error.
- history: an History object.
- cache: a dict containing cached pages
- special_pages: a dict containing page names used with "bebop" scheme;
values are dicts as well: the "open" key maps to a callable to use when
the page is accessed, and the optional "source" key maps to callable
returning the page source path.
"""
def __init__(self, config, cert_stash): def __init__(self, config, cert_stash):
self.config = config self.config = config
self.stash = cert_stash or {} self.stash = cert_stash
self.screen = None self.screen = None
self.dim = (0, 0) self.dim = (0, 0)
self.page_pad = None self.page_pad = None
@ -37,6 +57,7 @@ class Browser:
self.status_data = ("", 0, 0) self.status_data = ("", 0, 0)
self.history = History() self.history = History()
self.cache = {} self.cache = {}
self.special_pages = self.setup_special_pages()
self._current_url = "" self._current_url = ""
@property @property
@ -58,6 +79,18 @@ class Browser:
self._current_url = url self._current_url = url
self.set_status(url) self.set_status(url)
def setup_special_pages(self):
"""Return a dict with the special pages functions."""
return {
"bookmarks": {
"open": self.open_bookmarks,
"source": lambda: str(get_bookmarks_path())
},
"help": {
"open": self.open_help,
},
}
def run(self, *args, **kwargs): def run(self, *args, **kwargs):
"""Use curses' wrapper around _run.""" """Use curses' wrapper around _run."""
os.environ.setdefault("ESCDELAY", "25") os.environ.setdefault("ESCDELAY", "25")
@ -99,7 +132,9 @@ class Browser:
def handle_inputs(self): def handle_inputs(self):
char = self.screen.getch() char = self.screen.getch()
if char == ord(":"): if char == ord("?"):
self.open_help()
elif char == ord(":"):
self.quick_command("") self.quick_command("")
elif char == ord("r"): elif char == ord("r"):
self.reload_page() self.reload_page()
@ -218,11 +253,11 @@ class Browser:
def quick_command(self, command): def quick_command(self, command):
"""Shortcut method to take user input with a prefixed command string.""" """Shortcut method to take user input with a prefixed command string."""
prefix = f"{command} " if command else "" prefix = command + " " if command else ""
user_input = self.command_line.focus(":", prefix=prefix) text = self.command_line.focus(CommandLine.CHAR_COMMAND, prefix=prefix)
if not user_input: if not text:
return return
self.process_command(user_input) self.process_command(text)
def process_command(self, command_text: str): def process_command(self, command_text: str):
"""Handle a client command.""" """Handle a client command."""
@ -237,6 +272,9 @@ class Browser:
return return
if command in ("o", "open"): if command in ("o", "open"):
self.open_url(words[1], assume_absolute=True) self.open_url(words[1], assume_absolute=True)
elif command == "forget-certificate":
from bebop.browser.gemini import forget_certificate
forget_certificate(self, words[1])
def open_url(self, url, base_url=None, redirects=0, assume_absolute=False, def open_url(self, url, base_url=None, redirects=0, assume_absolute=False,
history=True, use_cache=True): history=True, use_cache=True):
@ -287,8 +325,11 @@ class Browser:
from bebop.browser.file import open_file from bebop.browser.file import open_file
open_file(self, parts.path, history=history) open_file(self, parts.path, history=history)
elif parts.scheme == "bebop": elif parts.scheme == "bebop":
if parts.netloc == "bookmarks": special_page = self.special_pages.get(parts.netloc)
self.open_bookmarks() if special_page:
special_page["open"]()
else:
self.set_status_error("Unknown page.")
else: else:
self.set_status_error(f"Protocol {parts.scheme} not supported.") self.set_status_error(f"Protocol {parts.scheme} not supported.")
@ -442,7 +483,10 @@ class Browser:
return return
self.set_status("Bookmark title?") self.set_status("Bookmark title?")
current_title = self.page_pad.current_page.title or "" current_title = self.page_pad.current_page.title or ""
title = self.command_line.focus(">", prefix=current_title) title = self.command_line.focus(
CommandLine.CHAR_TEXT,
prefix=current_title
)
if title: if title:
title = title.strip() title = title.strip()
if title: if title:
@ -458,11 +502,13 @@ class Browser:
directly from their location on disk. directly from their location on disk.
""" """
delete_source_after = False delete_source_after = False
special_pages = { if self.current_url.startswith("bebop://"):
"bebop://bookmarks": str(get_bookmarks_path()) page_name = self.current_url[len("bebop://"):]
} special_pages_functions = self.special_pages.get(page_name)
if self.current_url in special_pages: if not special_pages_functions:
source_filename = special_pages[self.current_url] return
get_source = special_pages_functions.get("source")
source_filename = get_source() if get_source else None
else: else:
if not self.page_pad.current_page: if not self.page_pad.current_page:
return return
@ -472,8 +518,21 @@ class Browser:
source_filename = source_file.name source_filename = source_file.name
delete_source_after = True delete_source_after = True
if not source_filename:
return
command = self.config["source_editor"] + [source_filename] command = self.config["source_editor"] + [source_filename]
open_external_program(command) open_external_program(command)
if delete_source_after: if delete_source_after:
os.unlink(source_filename) os.unlink(source_filename)
self.refresh_windows() self.refresh_windows()
def open_help(self):
"""Show the help page."""
self.load_page(Page.from_gemtext(HELP_PAGE, self.config["text_width"]))
self.current_url = "bebop://help"
def prompt(self, text, keys):
"""Display the text and allow it to type one of the given keys."""
self.set_status(text)
return self.command_line.prompt_key(keys)

View file

@ -3,10 +3,12 @@
from pathlib import Path from pathlib import Path
from bebop.browser.browser import Browser from bebop.browser.browser import Browser
from bebop.command_line import CommandLine
from bebop.fs import get_downloads_path from bebop.fs import get_downloads_path
from bebop.navigation import set_parameter from bebop.navigation import set_parameter
from bebop.page import Page from bebop.page import Page
from bebop.protocol import Request, Response from bebop.protocol import Request, Response
from bebop.tofu import trust_fingerprint, untrust_fingerprint, WRONG_FP_ALERT
MAX_URL_LEN = 1024 MAX_URL_LEN = 1024
@ -16,10 +18,30 @@ def open_gemini_url(browser: Browser, url, redirects=0, history=True,
use_cache=True): use_cache=True):
"""Open a Gemini URL and set the formatted response as content. """Open a Gemini URL and set the formatted response as content.
After initiating the connection, TODO While the specification is not set in stone, every client takes a slightly
different approach to enforcing TOFU. Read the `Request.connect` docs to
find about cases where connection is aborted without asking the user. What
interests us here is what happens when the user should decide herself? This
happens in several cases, matching the request possible states. Here is
what Bebop do (or want to do):
- STATE_INVALID_CERT: the certificate has non-fatal issues; we may
present the user the problems found and let her decide whether to trust
temporarily the certificate or not BUT we currently do not parse the
certificate's fields, so this state is never used.
- STATE_UNKNOWN_CERT: the certificate is valid but has not been seen before;
as we're doing TOFU here, we could automatically trust it or let the user
choose. For simplicity, we always trust it permanently.
Attributes:
- browser: Browser object making the request.
- url: a valid URL with Gemini scheme to open.
- redirects: current amount of redirections done to open the initial URL.
- history: if true, save the final URL to history.
- use_cache: if true, look up if the page is cached before requesting it.
""" """
if len(url) >= MAX_URL_LEN: if len(url) >= MAX_URL_LEN:
browser.set_status_error(f"Request URL too long.") browser.set_status_error("Request URL too long.")
return return
browser.set_status(f"Loading {url}") browser.set_status(f"Loading {url}")
@ -39,10 +61,10 @@ def open_gemini_url(browser: Browser, url, redirects=0, history=True,
if req.state == Request.STATE_ERROR_CERT: if req.state == Request.STATE_ERROR_CERT:
error = f"Certificate was missing or corrupt ({url})." error = f"Certificate was missing or corrupt ({url})."
elif req.state == Request.STATE_UNTRUSTED_CERT: elif req.state == Request.STATE_UNTRUSTED_CERT:
_handle_untrusted_cert(browser, req)
error = f"Certificate has been changed ({url})." error = f"Certificate has been changed ({url})."
# TODO propose the user ways to handle this.
elif req.state == Request.STATE_CONNECTION_FAILED: elif req.state == Request.STATE_CONNECTION_FAILED:
error_details = f": {req.error}" if req.error else "." error_details = ": " + req.error if req.error else "."
error = f"Connection failed ({url})" + error_details error = f"Connection failed ({url})" + error_details
else: else:
error = f"Connection failed ({url})." error = f"Connection failed ({url})."
@ -50,13 +72,18 @@ def open_gemini_url(browser: Browser, url, redirects=0, history=True,
return return
if req.state == Request.STATE_INVALID_CERT: if req.state == Request.STATE_INVALID_CERT:
# TODO propose abort / temp trust
pass pass
elif req.state == Request.STATE_UNKNOWN_CERT: elif req.state == Request.STATE_UNKNOWN_CERT:
# TODO propose abort / temp trust / perm trust # Certificate is valid but unknown: trust it permanently.
pass hostname = req.hostname
else: fingerprint = req.cert_validation["hash"]
pass # TODO trust_fingerprint(
browser.stash,
hostname,
"SHA-512",
fingerprint,
trust_always=True
)
data = req.proceed() data = req.proceed()
if not data: if not data:
@ -67,21 +94,47 @@ def open_gemini_url(browser: Browser, url, redirects=0, history=True,
browser.set_status_error(f"Server response parsing failed ({url}).") browser.set_status_error(f"Server response parsing failed ({url}).")
return return
_handle_response(browser, response, url, redirects, history)
def _handle_untrusted_cert(browser: Browser, request: Request):
"""Handle a mismatch between known & server fingerprints.
This function formats an alert page to explain to the user what the hell is
going on and displays it.
"""
remote_fp = request.cert_validation["hash"]
local_fp = request.cert_validation["saved_hash"]
alert_page_source = WRONG_FP_ALERT.format(
hostname=request.hostname,
local_fp=local_fp,
remote_fp=remote_fp,
)
alert_page = Page.from_gemtext(
alert_page_source,
browser.config["text_width"]
)
browser.load_page(alert_page)
def _handle_response(browser: Browser, response: Response, url: str,
redirects: int, history: bool):
"""Handle a response from a Gemini server."""
if response.code == 20: if response.code == 20:
handle_response_content(browser, url, response, history) _handle_successful_response(browser, response, url, history)
elif response.generic_code == 30 and response.meta: elif response.generic_code == 30 and response.meta:
browser.open_url(response.meta, base_url=url, redirects=redirects + 1) browser.open_url(response.meta, base_url=url, redirects=redirects + 1)
elif response.generic_code in (40, 50): elif response.generic_code in (40, 50):
error = f"Server error: {response.meta or Response.code.name}" error = f"Server error: {response.meta or Response.code.name}"
browser.set_status_error(error) browser.set_status_error(error)
elif response.generic_code == 10: elif response.generic_code == 10:
handle_input_request(browser, url, response.meta) _handle_input_request(browser, url, response.meta)
else: else:
error = f"Unhandled response code {response.code}" error = f"Unhandled response code {response.code}"
browser.set_status_error(error) browser.set_status_error(error)
def handle_response_content(browser: Browser, url: str, response: Response, def _handle_successful_response(browser: Browser, response: Response, url: str,
history: bool): history: bool):
"""Handle a successful response content from a Gemini server. """Handle a successful response content from a Gemini server.
@ -116,7 +169,7 @@ def handle_response_content(browser: Browser, url: str, response: Response,
text = response.content.decode("utf-8", errors="replace") text = response.content.decode("utf-8", errors="replace")
page = Page.from_text(text) page = Page.from_text(text)
else: else:
filepath = get_download_path(url) filepath = _get_download_path(url)
if page: if page:
browser.load_page(page) browser.load_page(page)
@ -137,7 +190,7 @@ def handle_response_content(browser: Browser, url: str, response: Response,
browser.set_status_error(error) browser.set_status_error(error)
def get_download_path(url: str) -> Path: def _get_download_path(url: str) -> Path:
"""Try to find the best download file path possible from this URL.""" """Try to find the best download file path possible from this URL."""
download_dir = get_downloads_path() download_dir = get_downloads_path()
url_parts = url.rsplit("/", maxsplit=1) url_parts = url.rsplit("/", maxsplit=1)
@ -149,14 +202,26 @@ def get_download_path(url: str) -> Path:
return download_dir / filename return download_dir / filename
def handle_input_request(browser: Browser, from_url: str, message: str =None): def _handle_input_request(browser: Browser, from_url: str, message: str =None):
"""Focus command-line to pass input to the server.""" """Focus command-line to pass input to the server."""
if message: if message:
browser.set_status(f"Input needed: {message}") browser.set_status(f"Input needed: {message}")
else: else:
browser.set_status("Input needed:") browser.set_status("Input needed:")
user_input = browser.command_line.focus("?") user_input = browser.command_line.focus(CommandLine.CHAR_TEXT)
if not user_input: if not user_input:
return return
url = set_parameter(from_url, user_input) url = set_parameter(from_url, user_input)
open_gemini_url(browser, url) open_gemini_url(browser, url)
def forget_certificate(browser: Browser, hostname: str):
"""Remove the fingerprint associated to this hostname for the cert stash."""
key = browser.prompt(f"Remove fingerprint from {hostname}? [y/N]", "ynN")
if key != "y":
browser.reset_status()
return
if untrust_fingerprint(browser.stash, hostname):
browser.set_status(f"Known certificate for {hostname} removed.")
else:
browser.set_status_error(f"Known certificate for {hostname} not found.")

View file

@ -17,18 +17,35 @@ class CommandLine:
the window's right border when writing more content than the width allows. the window's right border when writing more content than the width allows.
Therefore I just added the M-e keybind to call an external editor and use Therefore I just added the M-e keybind to call an external editor and use
its content as result. its content as result.
Attributes:
- window: curses window to use for the command line and Textbox.
- editor_command: external command to use to edit content externally.
- textbox: Textbox object handling user input.
""" """
CHAR_COMMAND = ":"
CHAR_DIGIT = "&"
CHAR_TEXT = ">"
def __init__(self, window, editor_command): def __init__(self, window, editor_command):
self.window = window self.window = window
self.editor_command = editor_command self.editor_command = editor_command
self.textbox = None self.textbox = curses.textpad.Textbox(self.window)
def clear(self): def clear(self):
"""Clear command-line contents.""" """Clear command-line contents."""
self.window.clear() self.window.clear()
self.window.refresh() self.window.refresh()
def gather(self):
"""Return the string currently written by the user in command line.
This doesn't count the command char used, but it includes then prefix.
Trailing whitespace is trimmed.
"""
return self.textbox.gather()[1:].rstrip()
def focus(self, command_char, validator=None, prefix=""): def focus(self, command_char, validator=None, prefix=""):
"""Give user focus to the command bar. """Give user focus to the command bar.
@ -46,13 +63,13 @@ class CommandLine:
User input as string. The string will be empty if the validator raised User input as string. The string will be empty if the validator raised
an EscapeInterrupt. an EscapeInterrupt.
""" """
validator = validator or self._validate_common_input
self.window.clear() self.window.clear()
self.window.refresh() self.window.refresh()
self.textbox = curses.textpad.Textbox(self.window)
self.window.addstr(command_char + prefix) self.window.addstr(command_char + prefix)
curses.curs_set(1) curses.curs_set(1)
try: try:
command = self.textbox.edit(validator or self.validate_common_input) command = self.textbox.edit(validator)
except EscapeCommandInterrupt: except EscapeCommandInterrupt:
command = "" command = ""
except TerminateCommandInterrupt as exc: except TerminateCommandInterrupt as exc:
@ -63,11 +80,7 @@ class CommandLine:
self.clear() self.clear()
return command return command
def gather(self): def _validate_common_input(self, ch: int):
"""Return the string currently written by the user in command line."""
return self.textbox.gather()[1:].rstrip()
def validate_common_input(self, ch: int):
"""Generic input validator, handles a few more cases than default. """Generic input validator, handles a few more cases than default.
This validator can be used as a default validator as it handles, on top This validator can be used as a default validator as it handles, on top
@ -136,8 +149,8 @@ class CommandLine:
if len(candidates) == 1: if len(candidates) == 1:
return 0, candidates[0] return 0, candidates[0]
# Else, focus the command line to let the user input more digits. # Else, focus the command line to let the user input more digits.
validator = lambda ch: self.validate_link_digit(ch, links, max_digits) validator = lambda ch: self._validate_link_digit(ch, links, max_digits)
link_input = self.focus("&", validator, digit) link_input = self.focus(CommandLine.CHAR_DIGIT, validator, digit)
if not link_input: if not link_input:
return 1, None return 1, None
try: try:
@ -146,10 +159,10 @@ class CommandLine:
return 2, f"Invalid link ID {link_input}." return 2, f"Invalid link ID {link_input}."
return 0, link_id return 0, link_id
def validate_link_digit(self, ch: int, links: Links, max_digits: int): def _validate_link_digit(self, ch: int, links: Links, max_digits: int):
"""Handle input chars to be used as link ID.""" """Handle input chars to be used as link ID."""
# Handle common chars. # Handle common chars.
ch = self.validate_common_input(ch) ch = self._validate_common_input(ch)
# Only accept digits. If we reach the amount of required digits, open # Only accept digits. If we reach the amount of required digits, open
# link now and leave command line. Else just process it. # link now and leave command line. Else just process it.
if curses.ascii.isdigit(ch): if curses.ascii.isdigit(ch):
@ -185,6 +198,25 @@ class CommandLine:
return return
raise TerminateCommandInterrupt(content) raise TerminateCommandInterrupt(content)
def prompt_key(self, keys):
"""Focus the command line and wait for the user """
validator = lambda ch: self._validate_prompt(ch, keys)
key = self.focus(CommandLine.CHAR_TEXT, validator)
return key if key in keys else ""
def _validate_prompt(self, ch: int, keys):
"""Handle input chars and raise a terminate interrupt on a valid key."""
# Handle common keys.
ch = self._validate_common_input(ch)
try:
char = chr(ch)
except ValueError:
pass
else:
if char in keys:
raise TerminateCommandInterrupt(char)
return 0
class EscapeCommandInterrupt(Exception): class EscapeCommandInterrupt(Exception):
"""Signal that ESC has been pressed during command line.""" """Signal that ESC has been pressed during command line."""

View file

@ -45,3 +45,11 @@ def get_downloads_path() -> Path:
if download_path: if download_path:
return Path(download_path) return Path(download_path)
return Path.home() return Path.home()
def ensure_bebop_files_exist():
"""Ensure various Bebop's files or directories are present."""
# Ensure the user data directory exists.
user_data_path = get_user_data_path()
if not user_data_path.exists():
user_data_path.mkdir(parents=True)

38
bebop/help.py Normal file
View file

@ -0,0 +1,38 @@
"""Help page. Currently only keybinds are shown as help."""
HELP_PAGE = """\
# Bebop keybinds
Keybinds using the SHIFT key are written uppercase. Keybinds using the ALT (or \
META) key are written using the "M-" prefix. Symbol keys are written as their \
name, not the symbol itself.
``` list of bebop keybinds
- colon: focus the command-line
- r: reload page
- h: scroll left a bit
- j: scroll down a bit
- k: scroll up a bit
- l: scroll right a bit
- H: scroll left a whole page
- J: scroll down a whole page
- K: scroll up a whole page
- L: scroll right a whole page
- M-h: scroll one column left
- M-j: scroll one line down
- M-k: scroll one line up
- M-l: scroll one column right
- circumflex: horizontally scroll back to the first column
- gg: go to the top of the page
- G: go to the bottom of the page
- o: open an URL
- p: go to the previous page
- u: go to the parent page (up a level in URL)
- U: go to the root page (root URL for the current domain)
- b: open bookmarks
- B: add current page to bookmarks
- e: open the current page source in an editor
- digits: go to the corresponding link ID
- escape: reset status line text
```
"""

View file

@ -8,19 +8,13 @@ from enum import IntEnum
from typing import Optional from typing import Optional
from bebop.mime import DEFAULT_MIME_TYPE, MimeType from bebop.mime import DEFAULT_MIME_TYPE, MimeType
from bebop.tofu import CertStatus, CERT_STATUS_INVALID, validate_cert from bebop.tofu import CertStatus, validate_cert
GEMINI_URL_RE = re.compile(r"gemini://(?P<host>[^/]+)(?P<path>.*)") GEMINI_URL_RE = re.compile(r"gemini://(?P<host>[^/]+)(?P<path>.*)")
LINE_TERM = b"\r\n" LINE_TERM = b"\r\n"
def parse_gemini_url(url):
"""Return a dict containing the hostname and the request path, or None."""
match = GEMINI_URL_RE.match(url)
return match.groupdict() if match else None
class Request: class Request:
"""A Gemini request. """A Gemini request.
@ -30,8 +24,21 @@ class Request:
sending the request header and receiving the response: sending the request header and receiving the response:
1. Instantiate a Request. 1. Instantiate a Request.
2. `connect` opens the connection, leaves the caller free to check stuff. 2. `connect` opens the connection and aborts it or leaves the caller free to
check stuff.
3. `proceed` or `abort` can be called. 3. `proceed` or `abort` can be called.
Attributes:
- url: URL to open.
- cert_stash: certificate stash to use an possibly update.
- state: request state.
- hostname: hostname derived from url, stored when `connect` is called.
- payload: bytes object of the payload request; build during `connect`, used
during `proceed`.
- ssock: TLS-wrapped socket.
- cert_validation: validation results dict, set after certificate has been
reviewed.
- error: human-readable connection error, may be set during `connect`.
""" """
# Initial state, connection is not established yet. # Initial state, connection is not established yet.
@ -55,28 +62,69 @@ class Request:
self.url = url self.url = url
self.cert_stash = cert_stash self.cert_stash = cert_stash
self.state = Request.STATE_INIT self.state = Request.STATE_INIT
self.hostname = ""
self.payload = b"" self.payload = b""
self.ssock = None self.ssock = None
self.cert = None self.cert_validation = None
self.cert_status = None
self.error = "" self.error = ""
def connect(self, timeout): def connect(self, timeout: int) -> bool:
"""Connect to a Gemini server and return a RequestEventType. """Connect to a Gemini server and return a RequestEventType.
Return True if the connection is established. The caller has to verify Return True if the connection is established. The caller has to verify
the request state and propose appropriate choices to the user if the the request state and propose appropriate choices to the user if the
certificate status is not CertStatus.VALID (Request.STATE_OK). certificate status is not CertStatus.VALID (Request.STATE_OK).
If connect returns False, the secure socket is aborted before return. If If connect returns False, the secure socket is aborted before return so
connect returns True, it is up to the caller to decide whether to there is no need to call `abort`. If connect returns True, it is up to the
continue (call proceed) the connection or abort it (call abort). caller to decide whether to continue (call `proceed`) the connection or
abort it (call `abort`).
The request `state` is updated to reflect the connection state after the
function returns. The following list describes states related to
connection failure (False returned):
- STATE_INVALID_URL: URL is not valid.
- STATE_CONNECTION_FAILED: connection failed, either TCP timeout or
local TLS failure. Additionally, the request `error` attribute is set
to an error string describing the issue.
For all request states from now on, the `cert_validation` attribute is
updated with the result of the certificate validation.
The following list describes states related to validation failure (False
returned):
- STATE_ERROR_CERT: server certificate could not be validated at all.
- STATE_UNTRUSTED_CERT: server certificate mismatched the known
certificate for that hostname. The user should be presented with
options to solve the matter.
For other states, the connection is not aborted (True returned):
- STATE_INVALID_CERT: the certificate has one or more issues, e.g.
mismatching hostname or it is expired.
- STATE_UNKNOWN_CERT: the certificate is valid but unknown.
- STATE_OK: the certificate is valid and matches the known certificate
of that hostname.
After this function returns, the request state cannot be STATE_INIT.
Additional notes:
- The DER hash is compared against the fingerprint for this hostname
*and port*; the specification does not tell much about that, but we
are slightly more restrictive here by adding the port in the equation.
- The state STATE_INVALID_CERT is actually never used in Bebop because
of the current tendency to ignore any certificate fields and only
check the whole cert fingerprint. Here it is considered the same as a
valid certificate.
""" """
url_parts = parse_gemini_url(self.url) url_parts = GEMINI_URL_RE.match(self.url)
if not url_parts: if not url_parts:
self.state = Request.STATE_INVALID_URL self.state = Request.STATE_INVALID_URL
return False return False
hostname = url_parts["host"] hostname = url_parts.groupdict()["host"]
if ":" in hostname: if ":" in hostname:
hostname, port = hostname.split(":", maxsplit=1) hostname, port = hostname.split(":", maxsplit=1)
try: try:
@ -86,6 +134,7 @@ class Request:
return False return False
else: else:
port = 1965 port = 1965
self.hostname = hostname
try: try:
self.payload = self.url.encode() self.payload = self.url.encode()
@ -105,27 +154,26 @@ class Request:
try: try:
self.ssock = context.wrap_socket(sock, server_hostname=hostname) self.ssock = context.wrap_socket(sock, server_hostname=hostname)
except OSError as exc: except OSError as exc:
sock.close()
self.state = Request.STATE_CONNECTION_FAILED self.state = Request.STATE_CONNECTION_FAILED
self.error = exc.strerror self.error = exc.strerror
return False return False
der = self.ssock.getpeercert(binary_form=True) der = self.ssock.getpeercert(binary_form=True)
self.cert_status, self.cert = \ self.cert_validation = validate_cert(der, hostname, self.cert_stash)
validate_cert(der, hostname, self.cert_stash) cert_status = self.cert_validation["status"]
if self.cert_status == CertStatus.ERROR: if cert_status == CertStatus.ERROR:
self.abort() self.abort()
self.state = Request.STATE_ERROR_CERT self.state = Request.STATE_ERROR_CERT
return False return False
if self.cert_status == CertStatus.WRONG_FINGERPRINT: if cert_status == CertStatus.WRONG_FINGERPRINT:
self.abort() self.abort()
self.state = Request.STATE_UNTRUSTED_CERT self.state = Request.STATE_UNTRUSTED_CERT
return False return False
if self.cert_status in CERT_STATUS_INVALID: if cert_status == CertStatus.VALID_NEW:
self.state = Request.STATE_INVALID_CERT
elif self.cert_status == CertStatus.VALID_NEW:
self.state = Request.STATE_UNKNOWN_CERT self.state = Request.STATE_UNKNOWN_CERT
else: # self.cert_status == CertStatus.VALID else: # self.cert_status in (VALID, VALID_NEW, INVALID_CERT)
self.state = Request.STATE_OK self.state = Request.STATE_OK
return True return True
@ -232,6 +280,6 @@ class Response:
return response return response
@staticmethod @staticmethod
def get_generic_code(code) -> int: def get_generic_code(code: int) -> int:
"""Return the generic version (x0) of this code.""" """Return the generic version (x0) of this code."""
return code - (code % 10) return code - (code % 10)

View file

@ -1,9 +1,9 @@
import unittest import unittest
from ..rendering import _explode_words, _find_next_sep, wrap_words from ..metalines import _explode_words, _find_next_sep, wrap_words
class TestRenderer(unittest.TestCase): class TestMetalines(unittest.TestCase):
def test_wrap_words(self): def test_wrap_words(self):
t = "wrap me wrap me youcantwrapthisonewithoutforce bla bla bla bla" t = "wrap me wrap me youcantwrapthisonewithoutforce bla bla bla bla"

View file

@ -1,10 +0,0 @@
import unittest
from ..protocol import parse_gemini_url
class TestGemini(unittest.TestCase):
def test_parse_url(self):
r1 = parse_gemini_url("gemini://dece.space")
self.assertDictEqual(r1, {"host": "dece.space", "path": ""})

View file

@ -4,19 +4,70 @@ As of writing there is still some debate around it, so it is quite messy and
requires more clarity both in specification and in our own implementation. requires more clarity both in specification and in our own implementation.
""" """
import datetime
import hashlib import hashlib
import re import re
from enum import Enum from enum import Enum
from pathlib import Path from pathlib import Path
from typing import Any, Dict, Optional
import asn1crypto.x509 from bebop.fs import get_user_data_path
STASH_LINE_RE = re.compile(r"(\S+) (\S+) (\S+) (\d+)") STASH_LINE_RE = re.compile(r"(\S+) (\S+) (\S+)")
WRONG_FP_ALERT = """\
The request could not complete because the certificate presented by the server \
does not match the certificate stored in the local stash.
``` details of the fingerprint mismatch
Hostname: {hostname}
Local fingerprint: {local_fp}
Server fingerprint: {remote_fp}
```
If you are sure this new certificate can be trusted, press ":" and type the \
following command to remove the previous certificate from the local stash, \
then retry your request:
``` command to use to forget about the previous certificate
forget-certificate {hostname}
```
You can also manually remove the certificate line from the known hosts file in \
your user data directory.
## FAQ
### What is this mismatch about?
Gemini uses TOFU (Trust On First Use) to verify the identity of the server you \
are visiting. It means that the first time you visited this capsule, it showed \
you its unique ID, but this time the ID is different, so the trust is broken.
Capsule owners often tell in advance when they are about the use a new \
certificate, but they may have forgotten or you may have missed it. Maybe the \
old certificate expired and/or has been replaced for another reason (e.g. \
using a far away expiration time, borking certificates during a migration, )
### Am I being hacked?
Probably not, but if you are visiting a sensitive capsule, make sure you're \
confident enough before trusting this new certificate.
### How to ensure this new certificate can be trusted?
Can you join the owner through mail or instant messaging? This is the simplest \
way for you to make sure that the server is fine, and maybe alert the owner on \
a problem on his server she did not notice.
"""
def load_cert_stash(stash_path: Path): def get_cert_stash_path() -> Path:
"""Return the default certificate stash path."""
return get_user_data_path() / "known_hosts.txt"
def load_cert_stash(stash_path: Path) -> Optional[Dict]:
"""Load the certificate stash from the file, or None on error. """Load the certificate stash from the file, or None on error.
The stash is a dict with host names as keys and tuples as values. Tuples The stash is a dict with host names as keys and tuples as values. Tuples
@ -36,8 +87,8 @@ def load_cert_stash(stash_path: Path):
match = STASH_LINE_RE.match(line) match = STASH_LINE_RE.match(line)
if not match: if not match:
continue continue
name, algo, fingerprint, timestamp = match.groups() name, algo, fingerprint = match.groups()
stash[name] = (algo, fingerprint, timestamp, True) stash[name] = (algo, fingerprint, True)
except (OSError, ValueError): except (OSError, ValueError):
return None return None
return stash return stash
@ -47,71 +98,67 @@ def save_cert_stash(stash: dict, stash_path: Path):
"""Save the certificate stash.""" """Save the certificate stash."""
try: try:
with open(stash_path, "wt") as stash_file: with open(stash_path, "wt") as stash_file:
for name, entry in stash.values(): for name, entry in stash.items():
algo, fingerprint, timestamp, is_permanent = entry algo, fingerprint, is_permanent = entry
if not is_permanent: if not is_permanent:
continue continue
entry_line = f"{name} {algo} {fingerprint} {timestamp}\n" entry_line = f"{name} {algo} {fingerprint}\n"
stash_file.write(entry_line) stash_file.write(entry_line)
except (OSError, ValueError): except (OSError, ValueError) as exc:
pass print(f"Failed to save certificate stash '{stash_path}': {exc}")
class CertStatus(Enum): class CertStatus(Enum):
"""Value returned by validate_cert.""" """Value returned by validate_cert."""
# Cert is valid: proceed. # Cert is valid: proceed.
VALID = 0 # Known and valid. VALID = 0 # Known and valid.
VALID_NEW = 7 # New and valid. VALID_NEW = 1 # New and valid.
# Cert is unusable or wrong: abort. # Cert is unusable or wrong: abort.
ERROR = 1 # General error. ERROR = 2 # General error.
WRONG_FINGERPRINT = 2 # Fingerprint in the stash is different. WRONG_FINGERPRINT = 3 # Fingerprint in the stash is different.
# Cert has some issues: ask to proceed.
NOT_VALID_YET = 3 # not-before date invalid.
EXPIRED = 4 # not-after date invalid.
BAD_DOMAIN = 5 # Host name is not in cert's valid domains.
CERT_STATUS_INVALID = ( def validate_cert(der, hostname, cert_stash) -> Dict[str, Any]:
CertStatus.NOT_VALID_YET, """Return a dict containing validation info for this certificate.
CertStatus.EXPIRED,
CertStatus.BAD_DOMAIN,
)
Returns:
def validate_cert(der, hostname, cert_stash): The validation dict can contain two keys:
"""Return a tuple (CertStatus, Certificate) for this certificate.""" - status: CertStatus, always present.
- hash: DER hash to be used as certificate fingerprint, present if status is
not CertStatus.ERROR.
- saved_hash: fingerprint for this hostname in the local stash, present if
status is CertStatus.WRONG_FINGERPRINT.
"""
if der is None: if der is None:
return CertStatus.ERROR, None return {"status": CertStatus.ERROR}
try:
cert = asn1crypto.x509.Certificate.load(der)
except ValueError:
return CertStatus.ERROR, None
# Check for sane parameters. known = False
now = datetime.datetime.now(tz=datetime.timezone.utc)
if now < cert.not_valid_before:
return CertStatus.NOT_VALID_YET, cert
if now > cert.not_valid_after:
return CertStatus.EXPIRED, cert
if hostname not in cert.valid_domains:
return CertStatus.BAD_DOMAIN, cert
# Check the entire certificate fingerprint. # Check the entire certificate fingerprint.
cert_hash = hashlib.sha512(der).hexdigest() cert_hash = hashlib.sha512(der).hexdigest()
result = {"hash": cert_hash} # type: Dict[str, Any]
if hostname in cert_stash: if hostname in cert_stash:
_, fingerprint, timestamp, _ = cert_stash[hostname] _, fingerprint, _ = cert_stash[hostname]
if timestamp >= now.timestamp():
if cert_hash != fingerprint: if cert_hash != fingerprint:
return CertStatus.WRONG_FINGERPRINT, cert result.update(
else: status=CertStatus.WRONG_FINGERPRINT,
# Disregard expired fingerprints. saved_hash=fingerprint
pass )
return CertStatus.VALID, cert return result
known = True
# The certificate is unknown and valid. result.update(status=CertStatus.VALID if known else CertStatus.VALID_NEW)
return CertStatus.VALID_NEW, cert return result
def trust(cert_stash, hostname, algo, fingerprint, timestamp, def trust_fingerprint(stash, hostname, algo, fingerprint, trust_always=False):
trust_always=False): """Add a fingerprint entry to this stash."""
cert_stash[hostname] = (algo, fingerprint, timestamp, trust_always) stash[hostname] = (algo, fingerprint, trust_always)
def untrust_fingerprint(stash, hostname):
"""Remove a fingerprint entry from this stash; return True on deletion."""
if hostname in stash:
del stash[hostname]
return True
return False

View file

@ -1 +0,0 @@
asn1crypto