Compare commits
3 commits
6574048763
...
1fddf4c2b2
Author | SHA1 | Date | |
---|---|---|---|
dece | 1fddf4c2b2 | ||
dece | 80ec71f30b | ||
dece | 396391ea80 |
|
@ -12,13 +12,13 @@ TODO DONE
|
||||||
view/edit sources
|
view/edit sources
|
||||||
downloads
|
downloads
|
||||||
configuration
|
configuration
|
||||||
|
help page
|
||||||
|
TOFU
|
||||||
open last download
|
open last download
|
||||||
actual TOFU
|
|
||||||
home page
|
home page
|
||||||
media files
|
media files
|
||||||
view history
|
view history
|
||||||
identity management
|
identity management
|
||||||
help page for keybinds
|
|
||||||
--------------------------------------------------------------------------------
|
--------------------------------------------------------------------------------
|
||||||
BACKLOG
|
BACKLOG
|
||||||
click on links to open them
|
click on links to open them
|
||||||
|
|
10
README.md
10
README.md
|
@ -29,11 +29,13 @@ Why use Bebop instead of something else?
|
||||||
|
|
||||||
### Lightweight
|
### Lightweight
|
||||||
|
|
||||||
It only uses a single dependency, [asn1crypto][asn1crypto], to delegate
|
It does not use any external dependencies. Everything including NCurses or TLS
|
||||||
parsing certificates. Everything else including NCurses or TLS is done using
|
is done using Python's standard library.
|
||||||
Python's standard library.
|
|
||||||
|
|
||||||
[asn1crypto]: https://github.com/wbond/asn1crypto
|
### Nice keybinds
|
||||||
|
|
||||||
|
A lot of keybinds are defined, and Vim users should get quickly familiar with
|
||||||
|
them. Find them in the help page by pressing `?`.
|
||||||
|
|
||||||
### Fun
|
### Fun
|
||||||
|
|
||||||
|
|
|
@ -2,8 +2,8 @@ import argparse
|
||||||
|
|
||||||
from bebop.browser.browser import Browser
|
from bebop.browser.browser import Browser
|
||||||
from bebop.config import load_config
|
from bebop.config import load_config
|
||||||
from bebop.fs import get_config_path, get_user_data_path
|
from bebop.fs import ensure_bebop_files_exist, get_config_path
|
||||||
from bebop.tofu import load_cert_stash, save_cert_stash
|
from bebop.tofu import get_cert_stash_path, load_cert_stash, save_cert_stash
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
|
@ -19,11 +19,9 @@ def main():
|
||||||
config_path = get_config_path()
|
config_path = get_config_path()
|
||||||
config = load_config(config_path)
|
config = load_config(config_path)
|
||||||
|
|
||||||
user_data_path = get_user_data_path()
|
ensure_bebop_files_exist()
|
||||||
if not user_data_path.exists():
|
|
||||||
user_data_path.mkdir()
|
|
||||||
|
|
||||||
cert_stash_path = user_data_path / "known_hosts.txt"
|
cert_stash_path = get_cert_stash_path()
|
||||||
cert_stash = load_cert_stash(cert_stash_path) or {}
|
cert_stash = load_cert_stash(cert_stash_path) or {}
|
||||||
try:
|
try:
|
||||||
Browser(config, cert_stash).run(start_url=start_url)
|
Browser(config, cert_stash).run(start_url=start_url)
|
||||||
|
|
|
@ -13,6 +13,7 @@ from bebop.bookmarks import (
|
||||||
from bebop.colors import ColorPair, init_colors
|
from bebop.colors import ColorPair, init_colors
|
||||||
from bebop.command_line import CommandLine
|
from bebop.command_line import CommandLine
|
||||||
from bebop.external import open_external_program
|
from bebop.external import open_external_program
|
||||||
|
from bebop.help import HELP_PAGE
|
||||||
from bebop.history import History
|
from bebop.history import History
|
||||||
from bebop.links import Links
|
from bebop.links import Links
|
||||||
from bebop.mouse import ButtonState
|
from bebop.mouse import ButtonState
|
||||||
|
@ -23,11 +24,30 @@ from bebop.page_pad import PagePad
|
||||||
|
|
||||||
|
|
||||||
class Browser:
|
class Browser:
|
||||||
"""Manage the events, inputs and rendering."""
|
"""Manage the events, inputs and rendering.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
- config: config dict passed to the browser.
|
||||||
|
- stash: certificate stash passed to the browser.
|
||||||
|
- screen: curses stdscr.
|
||||||
|
- dim: current screen dimensions.
|
||||||
|
- page_pad: curses pad containing the current page view.
|
||||||
|
- status_line: curses window used to report current status.
|
||||||
|
- command_line: a CommandLine object for the user to interact with.
|
||||||
|
- running: the browser will continue running while this is true.
|
||||||
|
- status_data: 3-uple of status text, color pair and attributes of the
|
||||||
|
status line, used to reset status after an error.
|
||||||
|
- history: an History object.
|
||||||
|
- cache: a dict containing cached pages
|
||||||
|
- special_pages: a dict containing page names used with "bebop" scheme;
|
||||||
|
values are dicts as well: the "open" key maps to a callable to use when
|
||||||
|
the page is accessed, and the optional "source" key maps to callable
|
||||||
|
returning the page source path.
|
||||||
|
"""
|
||||||
|
|
||||||
def __init__(self, config, cert_stash):
|
def __init__(self, config, cert_stash):
|
||||||
self.config = config
|
self.config = config
|
||||||
self.stash = cert_stash or {}
|
self.stash = cert_stash
|
||||||
self.screen = None
|
self.screen = None
|
||||||
self.dim = (0, 0)
|
self.dim = (0, 0)
|
||||||
self.page_pad = None
|
self.page_pad = None
|
||||||
|
@ -37,6 +57,7 @@ class Browser:
|
||||||
self.status_data = ("", 0, 0)
|
self.status_data = ("", 0, 0)
|
||||||
self.history = History()
|
self.history = History()
|
||||||
self.cache = {}
|
self.cache = {}
|
||||||
|
self.special_pages = self.setup_special_pages()
|
||||||
self._current_url = ""
|
self._current_url = ""
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@ -58,6 +79,18 @@ class Browser:
|
||||||
self._current_url = url
|
self._current_url = url
|
||||||
self.set_status(url)
|
self.set_status(url)
|
||||||
|
|
||||||
|
def setup_special_pages(self):
|
||||||
|
"""Return a dict with the special pages functions."""
|
||||||
|
return {
|
||||||
|
"bookmarks": {
|
||||||
|
"open": self.open_bookmarks,
|
||||||
|
"source": lambda: str(get_bookmarks_path())
|
||||||
|
},
|
||||||
|
"help": {
|
||||||
|
"open": self.open_help,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
def run(self, *args, **kwargs):
|
def run(self, *args, **kwargs):
|
||||||
"""Use curses' wrapper around _run."""
|
"""Use curses' wrapper around _run."""
|
||||||
os.environ.setdefault("ESCDELAY", "25")
|
os.environ.setdefault("ESCDELAY", "25")
|
||||||
|
@ -99,7 +132,9 @@ class Browser:
|
||||||
|
|
||||||
def handle_inputs(self):
|
def handle_inputs(self):
|
||||||
char = self.screen.getch()
|
char = self.screen.getch()
|
||||||
if char == ord(":"):
|
if char == ord("?"):
|
||||||
|
self.open_help()
|
||||||
|
elif char == ord(":"):
|
||||||
self.quick_command("")
|
self.quick_command("")
|
||||||
elif char == ord("r"):
|
elif char == ord("r"):
|
||||||
self.reload_page()
|
self.reload_page()
|
||||||
|
@ -218,11 +253,11 @@ class Browser:
|
||||||
|
|
||||||
def quick_command(self, command):
|
def quick_command(self, command):
|
||||||
"""Shortcut method to take user input with a prefixed command string."""
|
"""Shortcut method to take user input with a prefixed command string."""
|
||||||
prefix = f"{command} " if command else ""
|
prefix = command + " " if command else ""
|
||||||
user_input = self.command_line.focus(":", prefix=prefix)
|
text = self.command_line.focus(CommandLine.CHAR_COMMAND, prefix=prefix)
|
||||||
if not user_input:
|
if not text:
|
||||||
return
|
return
|
||||||
self.process_command(user_input)
|
self.process_command(text)
|
||||||
|
|
||||||
def process_command(self, command_text: str):
|
def process_command(self, command_text: str):
|
||||||
"""Handle a client command."""
|
"""Handle a client command."""
|
||||||
|
@ -237,6 +272,9 @@ class Browser:
|
||||||
return
|
return
|
||||||
if command in ("o", "open"):
|
if command in ("o", "open"):
|
||||||
self.open_url(words[1], assume_absolute=True)
|
self.open_url(words[1], assume_absolute=True)
|
||||||
|
elif command == "forget-certificate":
|
||||||
|
from bebop.browser.gemini import forget_certificate
|
||||||
|
forget_certificate(self, words[1])
|
||||||
|
|
||||||
def open_url(self, url, base_url=None, redirects=0, assume_absolute=False,
|
def open_url(self, url, base_url=None, redirects=0, assume_absolute=False,
|
||||||
history=True, use_cache=True):
|
history=True, use_cache=True):
|
||||||
|
@ -287,8 +325,11 @@ class Browser:
|
||||||
from bebop.browser.file import open_file
|
from bebop.browser.file import open_file
|
||||||
open_file(self, parts.path, history=history)
|
open_file(self, parts.path, history=history)
|
||||||
elif parts.scheme == "bebop":
|
elif parts.scheme == "bebop":
|
||||||
if parts.netloc == "bookmarks":
|
special_page = self.special_pages.get(parts.netloc)
|
||||||
self.open_bookmarks()
|
if special_page:
|
||||||
|
special_page["open"]()
|
||||||
|
else:
|
||||||
|
self.set_status_error("Unknown page.")
|
||||||
else:
|
else:
|
||||||
self.set_status_error(f"Protocol {parts.scheme} not supported.")
|
self.set_status_error(f"Protocol {parts.scheme} not supported.")
|
||||||
|
|
||||||
|
@ -442,7 +483,10 @@ class Browser:
|
||||||
return
|
return
|
||||||
self.set_status("Bookmark title?")
|
self.set_status("Bookmark title?")
|
||||||
current_title = self.page_pad.current_page.title or ""
|
current_title = self.page_pad.current_page.title or ""
|
||||||
title = self.command_line.focus(">", prefix=current_title)
|
title = self.command_line.focus(
|
||||||
|
CommandLine.CHAR_TEXT,
|
||||||
|
prefix=current_title
|
||||||
|
)
|
||||||
if title:
|
if title:
|
||||||
title = title.strip()
|
title = title.strip()
|
||||||
if title:
|
if title:
|
||||||
|
@ -458,11 +502,13 @@ class Browser:
|
||||||
directly from their location on disk.
|
directly from their location on disk.
|
||||||
"""
|
"""
|
||||||
delete_source_after = False
|
delete_source_after = False
|
||||||
special_pages = {
|
if self.current_url.startswith("bebop://"):
|
||||||
"bebop://bookmarks": str(get_bookmarks_path())
|
page_name = self.current_url[len("bebop://"):]
|
||||||
}
|
special_pages_functions = self.special_pages.get(page_name)
|
||||||
if self.current_url in special_pages:
|
if not special_pages_functions:
|
||||||
source_filename = special_pages[self.current_url]
|
return
|
||||||
|
get_source = special_pages_functions.get("source")
|
||||||
|
source_filename = get_source() if get_source else None
|
||||||
else:
|
else:
|
||||||
if not self.page_pad.current_page:
|
if not self.page_pad.current_page:
|
||||||
return
|
return
|
||||||
|
@ -472,8 +518,21 @@ class Browser:
|
||||||
source_filename = source_file.name
|
source_filename = source_file.name
|
||||||
delete_source_after = True
|
delete_source_after = True
|
||||||
|
|
||||||
|
if not source_filename:
|
||||||
|
return
|
||||||
|
|
||||||
command = self.config["source_editor"] + [source_filename]
|
command = self.config["source_editor"] + [source_filename]
|
||||||
open_external_program(command)
|
open_external_program(command)
|
||||||
if delete_source_after:
|
if delete_source_after:
|
||||||
os.unlink(source_filename)
|
os.unlink(source_filename)
|
||||||
self.refresh_windows()
|
self.refresh_windows()
|
||||||
|
|
||||||
|
def open_help(self):
|
||||||
|
"""Show the help page."""
|
||||||
|
self.load_page(Page.from_gemtext(HELP_PAGE, self.config["text_width"]))
|
||||||
|
self.current_url = "bebop://help"
|
||||||
|
|
||||||
|
def prompt(self, text, keys):
|
||||||
|
"""Display the text and allow it to type one of the given keys."""
|
||||||
|
self.set_status(text)
|
||||||
|
return self.command_line.prompt_key(keys)
|
||||||
|
|
|
@ -3,10 +3,12 @@
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from bebop.browser.browser import Browser
|
from bebop.browser.browser import Browser
|
||||||
|
from bebop.command_line import CommandLine
|
||||||
from bebop.fs import get_downloads_path
|
from bebop.fs import get_downloads_path
|
||||||
from bebop.navigation import set_parameter
|
from bebop.navigation import set_parameter
|
||||||
from bebop.page import Page
|
from bebop.page import Page
|
||||||
from bebop.protocol import Request, Response
|
from bebop.protocol import Request, Response
|
||||||
|
from bebop.tofu import trust_fingerprint, untrust_fingerprint, WRONG_FP_ALERT
|
||||||
|
|
||||||
|
|
||||||
MAX_URL_LEN = 1024
|
MAX_URL_LEN = 1024
|
||||||
|
@ -16,10 +18,30 @@ def open_gemini_url(browser: Browser, url, redirects=0, history=True,
|
||||||
use_cache=True):
|
use_cache=True):
|
||||||
"""Open a Gemini URL and set the formatted response as content.
|
"""Open a Gemini URL and set the formatted response as content.
|
||||||
|
|
||||||
After initiating the connection, TODO
|
While the specification is not set in stone, every client takes a slightly
|
||||||
|
different approach to enforcing TOFU. Read the `Request.connect` docs to
|
||||||
|
find about cases where connection is aborted without asking the user. What
|
||||||
|
interests us here is what happens when the user should decide herself? This
|
||||||
|
happens in several cases, matching the request possible states. Here is
|
||||||
|
what Bebop do (or want to do):
|
||||||
|
|
||||||
|
- STATE_INVALID_CERT: the certificate has non-fatal issues; we may
|
||||||
|
present the user the problems found and let her decide whether to trust
|
||||||
|
temporarily the certificate or not BUT we currently do not parse the
|
||||||
|
certificate's fields, so this state is never used.
|
||||||
|
- STATE_UNKNOWN_CERT: the certificate is valid but has not been seen before;
|
||||||
|
as we're doing TOFU here, we could automatically trust it or let the user
|
||||||
|
choose. For simplicity, we always trust it permanently.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
- browser: Browser object making the request.
|
||||||
|
- url: a valid URL with Gemini scheme to open.
|
||||||
|
- redirects: current amount of redirections done to open the initial URL.
|
||||||
|
- history: if true, save the final URL to history.
|
||||||
|
- use_cache: if true, look up if the page is cached before requesting it.
|
||||||
"""
|
"""
|
||||||
if len(url) >= MAX_URL_LEN:
|
if len(url) >= MAX_URL_LEN:
|
||||||
browser.set_status_error(f"Request URL too long.")
|
browser.set_status_error("Request URL too long.")
|
||||||
return
|
return
|
||||||
|
|
||||||
browser.set_status(f"Loading {url}")
|
browser.set_status(f"Loading {url}")
|
||||||
|
@ -39,10 +61,10 @@ def open_gemini_url(browser: Browser, url, redirects=0, history=True,
|
||||||
if req.state == Request.STATE_ERROR_CERT:
|
if req.state == Request.STATE_ERROR_CERT:
|
||||||
error = f"Certificate was missing or corrupt ({url})."
|
error = f"Certificate was missing or corrupt ({url})."
|
||||||
elif req.state == Request.STATE_UNTRUSTED_CERT:
|
elif req.state == Request.STATE_UNTRUSTED_CERT:
|
||||||
|
_handle_untrusted_cert(browser, req)
|
||||||
error = f"Certificate has been changed ({url})."
|
error = f"Certificate has been changed ({url})."
|
||||||
# TODO propose the user ways to handle this.
|
|
||||||
elif req.state == Request.STATE_CONNECTION_FAILED:
|
elif req.state == Request.STATE_CONNECTION_FAILED:
|
||||||
error_details = f": {req.error}" if req.error else "."
|
error_details = ": " + req.error if req.error else "."
|
||||||
error = f"Connection failed ({url})" + error_details
|
error = f"Connection failed ({url})" + error_details
|
||||||
else:
|
else:
|
||||||
error = f"Connection failed ({url})."
|
error = f"Connection failed ({url})."
|
||||||
|
@ -50,13 +72,18 @@ def open_gemini_url(browser: Browser, url, redirects=0, history=True,
|
||||||
return
|
return
|
||||||
|
|
||||||
if req.state == Request.STATE_INVALID_CERT:
|
if req.state == Request.STATE_INVALID_CERT:
|
||||||
# TODO propose abort / temp trust
|
|
||||||
pass
|
pass
|
||||||
elif req.state == Request.STATE_UNKNOWN_CERT:
|
elif req.state == Request.STATE_UNKNOWN_CERT:
|
||||||
# TODO propose abort / temp trust / perm trust
|
# Certificate is valid but unknown: trust it permanently.
|
||||||
pass
|
hostname = req.hostname
|
||||||
else:
|
fingerprint = req.cert_validation["hash"]
|
||||||
pass # TODO
|
trust_fingerprint(
|
||||||
|
browser.stash,
|
||||||
|
hostname,
|
||||||
|
"SHA-512",
|
||||||
|
fingerprint,
|
||||||
|
trust_always=True
|
||||||
|
)
|
||||||
|
|
||||||
data = req.proceed()
|
data = req.proceed()
|
||||||
if not data:
|
if not data:
|
||||||
|
@ -67,22 +94,48 @@ def open_gemini_url(browser: Browser, url, redirects=0, history=True,
|
||||||
browser.set_status_error(f"Server response parsing failed ({url}).")
|
browser.set_status_error(f"Server response parsing failed ({url}).")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
_handle_response(browser, response, url, redirects, history)
|
||||||
|
|
||||||
|
|
||||||
|
def _handle_untrusted_cert(browser: Browser, request: Request):
|
||||||
|
"""Handle a mismatch between known & server fingerprints.
|
||||||
|
|
||||||
|
This function formats an alert page to explain to the user what the hell is
|
||||||
|
going on and displays it.
|
||||||
|
"""
|
||||||
|
remote_fp = request.cert_validation["hash"]
|
||||||
|
local_fp = request.cert_validation["saved_hash"]
|
||||||
|
alert_page_source = WRONG_FP_ALERT.format(
|
||||||
|
hostname=request.hostname,
|
||||||
|
local_fp=local_fp,
|
||||||
|
remote_fp=remote_fp,
|
||||||
|
)
|
||||||
|
alert_page = Page.from_gemtext(
|
||||||
|
alert_page_source,
|
||||||
|
browser.config["text_width"]
|
||||||
|
)
|
||||||
|
browser.load_page(alert_page)
|
||||||
|
|
||||||
|
|
||||||
|
def _handle_response(browser: Browser, response: Response, url: str,
|
||||||
|
redirects: int, history: bool):
|
||||||
|
"""Handle a response from a Gemini server."""
|
||||||
if response.code == 20:
|
if response.code == 20:
|
||||||
handle_response_content(browser, url, response, history)
|
_handle_successful_response(browser, response, url, history)
|
||||||
elif response.generic_code == 30 and response.meta:
|
elif response.generic_code == 30 and response.meta:
|
||||||
browser.open_url(response.meta, base_url=url, redirects=redirects + 1)
|
browser.open_url(response.meta, base_url=url, redirects=redirects + 1)
|
||||||
elif response.generic_code in (40, 50):
|
elif response.generic_code in (40, 50):
|
||||||
error = f"Server error: {response.meta or Response.code.name}"
|
error = f"Server error: {response.meta or Response.code.name}"
|
||||||
browser.set_status_error(error)
|
browser.set_status_error(error)
|
||||||
elif response.generic_code == 10:
|
elif response.generic_code == 10:
|
||||||
handle_input_request(browser, url, response.meta)
|
_handle_input_request(browser, url, response.meta)
|
||||||
else:
|
else:
|
||||||
error = f"Unhandled response code {response.code}"
|
error = f"Unhandled response code {response.code}"
|
||||||
browser.set_status_error(error)
|
browser.set_status_error(error)
|
||||||
|
|
||||||
|
|
||||||
def handle_response_content(browser: Browser, url: str, response: Response,
|
def _handle_successful_response(browser: Browser, response: Response, url: str,
|
||||||
history: bool):
|
history: bool):
|
||||||
"""Handle a successful response content from a Gemini server.
|
"""Handle a successful response content from a Gemini server.
|
||||||
|
|
||||||
According to the MIME type received or inferred, the response is either
|
According to the MIME type received or inferred, the response is either
|
||||||
|
@ -116,7 +169,7 @@ def handle_response_content(browser: Browser, url: str, response: Response,
|
||||||
text = response.content.decode("utf-8", errors="replace")
|
text = response.content.decode("utf-8", errors="replace")
|
||||||
page = Page.from_text(text)
|
page = Page.from_text(text)
|
||||||
else:
|
else:
|
||||||
filepath = get_download_path(url)
|
filepath = _get_download_path(url)
|
||||||
|
|
||||||
if page:
|
if page:
|
||||||
browser.load_page(page)
|
browser.load_page(page)
|
||||||
|
@ -137,7 +190,7 @@ def handle_response_content(browser: Browser, url: str, response: Response,
|
||||||
browser.set_status_error(error)
|
browser.set_status_error(error)
|
||||||
|
|
||||||
|
|
||||||
def get_download_path(url: str) -> Path:
|
def _get_download_path(url: str) -> Path:
|
||||||
"""Try to find the best download file path possible from this URL."""
|
"""Try to find the best download file path possible from this URL."""
|
||||||
download_dir = get_downloads_path()
|
download_dir = get_downloads_path()
|
||||||
url_parts = url.rsplit("/", maxsplit=1)
|
url_parts = url.rsplit("/", maxsplit=1)
|
||||||
|
@ -149,14 +202,26 @@ def get_download_path(url: str) -> Path:
|
||||||
return download_dir / filename
|
return download_dir / filename
|
||||||
|
|
||||||
|
|
||||||
def handle_input_request(browser: Browser, from_url: str, message: str =None):
|
def _handle_input_request(browser: Browser, from_url: str, message: str =None):
|
||||||
"""Focus command-line to pass input to the server."""
|
"""Focus command-line to pass input to the server."""
|
||||||
if message:
|
if message:
|
||||||
browser.set_status(f"Input needed: {message}")
|
browser.set_status(f"Input needed: {message}")
|
||||||
else:
|
else:
|
||||||
browser.set_status("Input needed:")
|
browser.set_status("Input needed:")
|
||||||
user_input = browser.command_line.focus("?")
|
user_input = browser.command_line.focus(CommandLine.CHAR_TEXT)
|
||||||
if not user_input:
|
if not user_input:
|
||||||
return
|
return
|
||||||
url = set_parameter(from_url, user_input)
|
url = set_parameter(from_url, user_input)
|
||||||
open_gemini_url(browser, url)
|
open_gemini_url(browser, url)
|
||||||
|
|
||||||
|
|
||||||
|
def forget_certificate(browser: Browser, hostname: str):
|
||||||
|
"""Remove the fingerprint associated to this hostname for the cert stash."""
|
||||||
|
key = browser.prompt(f"Remove fingerprint from {hostname}? [y/N]", "ynN")
|
||||||
|
if key != "y":
|
||||||
|
browser.reset_status()
|
||||||
|
return
|
||||||
|
if untrust_fingerprint(browser.stash, hostname):
|
||||||
|
browser.set_status(f"Known certificate for {hostname} removed.")
|
||||||
|
else:
|
||||||
|
browser.set_status_error(f"Known certificate for {hostname} not found.")
|
||||||
|
|
|
@ -17,18 +17,35 @@ class CommandLine:
|
||||||
the window's right border when writing more content than the width allows.
|
the window's right border when writing more content than the width allows.
|
||||||
Therefore I just added the M-e keybind to call an external editor and use
|
Therefore I just added the M-e keybind to call an external editor and use
|
||||||
its content as result.
|
its content as result.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
- window: curses window to use for the command line and Textbox.
|
||||||
|
- editor_command: external command to use to edit content externally.
|
||||||
|
- textbox: Textbox object handling user input.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
CHAR_COMMAND = ":"
|
||||||
|
CHAR_DIGIT = "&"
|
||||||
|
CHAR_TEXT = ">"
|
||||||
|
|
||||||
def __init__(self, window, editor_command):
|
def __init__(self, window, editor_command):
|
||||||
self.window = window
|
self.window = window
|
||||||
self.editor_command = editor_command
|
self.editor_command = editor_command
|
||||||
self.textbox = None
|
self.textbox = curses.textpad.Textbox(self.window)
|
||||||
|
|
||||||
def clear(self):
|
def clear(self):
|
||||||
"""Clear command-line contents."""
|
"""Clear command-line contents."""
|
||||||
self.window.clear()
|
self.window.clear()
|
||||||
self.window.refresh()
|
self.window.refresh()
|
||||||
|
|
||||||
|
def gather(self):
|
||||||
|
"""Return the string currently written by the user in command line.
|
||||||
|
|
||||||
|
This doesn't count the command char used, but it includes then prefix.
|
||||||
|
Trailing whitespace is trimmed.
|
||||||
|
"""
|
||||||
|
return self.textbox.gather()[1:].rstrip()
|
||||||
|
|
||||||
def focus(self, command_char, validator=None, prefix=""):
|
def focus(self, command_char, validator=None, prefix=""):
|
||||||
"""Give user focus to the command bar.
|
"""Give user focus to the command bar.
|
||||||
|
|
||||||
|
@ -46,13 +63,13 @@ class CommandLine:
|
||||||
User input as string. The string will be empty if the validator raised
|
User input as string. The string will be empty if the validator raised
|
||||||
an EscapeInterrupt.
|
an EscapeInterrupt.
|
||||||
"""
|
"""
|
||||||
|
validator = validator or self._validate_common_input
|
||||||
self.window.clear()
|
self.window.clear()
|
||||||
self.window.refresh()
|
self.window.refresh()
|
||||||
self.textbox = curses.textpad.Textbox(self.window)
|
|
||||||
self.window.addstr(command_char + prefix)
|
self.window.addstr(command_char + prefix)
|
||||||
curses.curs_set(1)
|
curses.curs_set(1)
|
||||||
try:
|
try:
|
||||||
command = self.textbox.edit(validator or self.validate_common_input)
|
command = self.textbox.edit(validator)
|
||||||
except EscapeCommandInterrupt:
|
except EscapeCommandInterrupt:
|
||||||
command = ""
|
command = ""
|
||||||
except TerminateCommandInterrupt as exc:
|
except TerminateCommandInterrupt as exc:
|
||||||
|
@ -63,11 +80,7 @@ class CommandLine:
|
||||||
self.clear()
|
self.clear()
|
||||||
return command
|
return command
|
||||||
|
|
||||||
def gather(self):
|
def _validate_common_input(self, ch: int):
|
||||||
"""Return the string currently written by the user in command line."""
|
|
||||||
return self.textbox.gather()[1:].rstrip()
|
|
||||||
|
|
||||||
def validate_common_input(self, ch: int):
|
|
||||||
"""Generic input validator, handles a few more cases than default.
|
"""Generic input validator, handles a few more cases than default.
|
||||||
|
|
||||||
This validator can be used as a default validator as it handles, on top
|
This validator can be used as a default validator as it handles, on top
|
||||||
|
@ -136,8 +149,8 @@ class CommandLine:
|
||||||
if len(candidates) == 1:
|
if len(candidates) == 1:
|
||||||
return 0, candidates[0]
|
return 0, candidates[0]
|
||||||
# Else, focus the command line to let the user input more digits.
|
# Else, focus the command line to let the user input more digits.
|
||||||
validator = lambda ch: self.validate_link_digit(ch, links, max_digits)
|
validator = lambda ch: self._validate_link_digit(ch, links, max_digits)
|
||||||
link_input = self.focus("&", validator, digit)
|
link_input = self.focus(CommandLine.CHAR_DIGIT, validator, digit)
|
||||||
if not link_input:
|
if not link_input:
|
||||||
return 1, None
|
return 1, None
|
||||||
try:
|
try:
|
||||||
|
@ -146,10 +159,10 @@ class CommandLine:
|
||||||
return 2, f"Invalid link ID {link_input}."
|
return 2, f"Invalid link ID {link_input}."
|
||||||
return 0, link_id
|
return 0, link_id
|
||||||
|
|
||||||
def validate_link_digit(self, ch: int, links: Links, max_digits: int):
|
def _validate_link_digit(self, ch: int, links: Links, max_digits: int):
|
||||||
"""Handle input chars to be used as link ID."""
|
"""Handle input chars to be used as link ID."""
|
||||||
# Handle common chars.
|
# Handle common chars.
|
||||||
ch = self.validate_common_input(ch)
|
ch = self._validate_common_input(ch)
|
||||||
# Only accept digits. If we reach the amount of required digits, open
|
# Only accept digits. If we reach the amount of required digits, open
|
||||||
# link now and leave command line. Else just process it.
|
# link now and leave command line. Else just process it.
|
||||||
if curses.ascii.isdigit(ch):
|
if curses.ascii.isdigit(ch):
|
||||||
|
@ -185,6 +198,25 @@ class CommandLine:
|
||||||
return
|
return
|
||||||
raise TerminateCommandInterrupt(content)
|
raise TerminateCommandInterrupt(content)
|
||||||
|
|
||||||
|
def prompt_key(self, keys):
|
||||||
|
"""Focus the command line and wait for the user """
|
||||||
|
validator = lambda ch: self._validate_prompt(ch, keys)
|
||||||
|
key = self.focus(CommandLine.CHAR_TEXT, validator)
|
||||||
|
return key if key in keys else ""
|
||||||
|
|
||||||
|
def _validate_prompt(self, ch: int, keys):
|
||||||
|
"""Handle input chars and raise a terminate interrupt on a valid key."""
|
||||||
|
# Handle common keys.
|
||||||
|
ch = self._validate_common_input(ch)
|
||||||
|
try:
|
||||||
|
char = chr(ch)
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
if char in keys:
|
||||||
|
raise TerminateCommandInterrupt(char)
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
class EscapeCommandInterrupt(Exception):
|
class EscapeCommandInterrupt(Exception):
|
||||||
"""Signal that ESC has been pressed during command line."""
|
"""Signal that ESC has been pressed during command line."""
|
||||||
|
|
|
@ -45,3 +45,11 @@ def get_downloads_path() -> Path:
|
||||||
if download_path:
|
if download_path:
|
||||||
return Path(download_path)
|
return Path(download_path)
|
||||||
return Path.home()
|
return Path.home()
|
||||||
|
|
||||||
|
|
||||||
|
def ensure_bebop_files_exist():
|
||||||
|
"""Ensure various Bebop's files or directories are present."""
|
||||||
|
# Ensure the user data directory exists.
|
||||||
|
user_data_path = get_user_data_path()
|
||||||
|
if not user_data_path.exists():
|
||||||
|
user_data_path.mkdir(parents=True)
|
||||||
|
|
38
bebop/help.py
Normal file
38
bebop/help.py
Normal file
|
@ -0,0 +1,38 @@
|
||||||
|
"""Help page. Currently only keybinds are shown as help."""
|
||||||
|
|
||||||
|
HELP_PAGE = """\
|
||||||
|
# Bebop keybinds
|
||||||
|
|
||||||
|
Keybinds using the SHIFT key are written uppercase. Keybinds using the ALT (or \
|
||||||
|
META) key are written using the "M-" prefix. Symbol keys are written as their \
|
||||||
|
name, not the symbol itself.
|
||||||
|
|
||||||
|
``` list of bebop keybinds
|
||||||
|
- colon: focus the command-line
|
||||||
|
- r: reload page
|
||||||
|
- h: scroll left a bit
|
||||||
|
- j: scroll down a bit
|
||||||
|
- k: scroll up a bit
|
||||||
|
- l: scroll right a bit
|
||||||
|
- H: scroll left a whole page
|
||||||
|
- J: scroll down a whole page
|
||||||
|
- K: scroll up a whole page
|
||||||
|
- L: scroll right a whole page
|
||||||
|
- M-h: scroll one column left
|
||||||
|
- M-j: scroll one line down
|
||||||
|
- M-k: scroll one line up
|
||||||
|
- M-l: scroll one column right
|
||||||
|
- circumflex: horizontally scroll back to the first column
|
||||||
|
- gg: go to the top of the page
|
||||||
|
- G: go to the bottom of the page
|
||||||
|
- o: open an URL
|
||||||
|
- p: go to the previous page
|
||||||
|
- u: go to the parent page (up a level in URL)
|
||||||
|
- U: go to the root page (root URL for the current domain)
|
||||||
|
- b: open bookmarks
|
||||||
|
- B: add current page to bookmarks
|
||||||
|
- e: open the current page source in an editor
|
||||||
|
- digits: go to the corresponding link ID
|
||||||
|
- escape: reset status line text
|
||||||
|
```
|
||||||
|
"""
|
|
@ -8,19 +8,13 @@ from enum import IntEnum
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
from bebop.mime import DEFAULT_MIME_TYPE, MimeType
|
from bebop.mime import DEFAULT_MIME_TYPE, MimeType
|
||||||
from bebop.tofu import CertStatus, CERT_STATUS_INVALID, validate_cert
|
from bebop.tofu import CertStatus, validate_cert
|
||||||
|
|
||||||
|
|
||||||
GEMINI_URL_RE = re.compile(r"gemini://(?P<host>[^/]+)(?P<path>.*)")
|
GEMINI_URL_RE = re.compile(r"gemini://(?P<host>[^/]+)(?P<path>.*)")
|
||||||
LINE_TERM = b"\r\n"
|
LINE_TERM = b"\r\n"
|
||||||
|
|
||||||
|
|
||||||
def parse_gemini_url(url):
|
|
||||||
"""Return a dict containing the hostname and the request path, or None."""
|
|
||||||
match = GEMINI_URL_RE.match(url)
|
|
||||||
return match.groupdict() if match else None
|
|
||||||
|
|
||||||
|
|
||||||
class Request:
|
class Request:
|
||||||
"""A Gemini request.
|
"""A Gemini request.
|
||||||
|
|
||||||
|
@ -30,8 +24,21 @@ class Request:
|
||||||
sending the request header and receiving the response:
|
sending the request header and receiving the response:
|
||||||
|
|
||||||
1. Instantiate a Request.
|
1. Instantiate a Request.
|
||||||
2. `connect` opens the connection, leaves the caller free to check stuff.
|
2. `connect` opens the connection and aborts it or leaves the caller free to
|
||||||
|
check stuff.
|
||||||
3. `proceed` or `abort` can be called.
|
3. `proceed` or `abort` can be called.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
- url: URL to open.
|
||||||
|
- cert_stash: certificate stash to use an possibly update.
|
||||||
|
- state: request state.
|
||||||
|
- hostname: hostname derived from url, stored when `connect` is called.
|
||||||
|
- payload: bytes object of the payload request; build during `connect`, used
|
||||||
|
during `proceed`.
|
||||||
|
- ssock: TLS-wrapped socket.
|
||||||
|
- cert_validation: validation results dict, set after certificate has been
|
||||||
|
reviewed.
|
||||||
|
- error: human-readable connection error, may be set during `connect`.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# Initial state, connection is not established yet.
|
# Initial state, connection is not established yet.
|
||||||
|
@ -55,28 +62,69 @@ class Request:
|
||||||
self.url = url
|
self.url = url
|
||||||
self.cert_stash = cert_stash
|
self.cert_stash = cert_stash
|
||||||
self.state = Request.STATE_INIT
|
self.state = Request.STATE_INIT
|
||||||
|
self.hostname = ""
|
||||||
self.payload = b""
|
self.payload = b""
|
||||||
self.ssock = None
|
self.ssock = None
|
||||||
self.cert = None
|
self.cert_validation = None
|
||||||
self.cert_status = None
|
|
||||||
self.error = ""
|
self.error = ""
|
||||||
|
|
||||||
def connect(self, timeout):
|
def connect(self, timeout: int) -> bool:
|
||||||
"""Connect to a Gemini server and return a RequestEventType.
|
"""Connect to a Gemini server and return a RequestEventType.
|
||||||
|
|
||||||
Return True if the connection is established. The caller has to verify
|
Return True if the connection is established. The caller has to verify
|
||||||
the request state and propose appropriate choices to the user if the
|
the request state and propose appropriate choices to the user if the
|
||||||
certificate status is not CertStatus.VALID (Request.STATE_OK).
|
certificate status is not CertStatus.VALID (Request.STATE_OK).
|
||||||
|
|
||||||
If connect returns False, the secure socket is aborted before return. If
|
If connect returns False, the secure socket is aborted before return so
|
||||||
connect returns True, it is up to the caller to decide whether to
|
there is no need to call `abort`. If connect returns True, it is up to the
|
||||||
continue (call proceed) the connection or abort it (call abort).
|
caller to decide whether to continue (call `proceed`) the connection or
|
||||||
|
abort it (call `abort`).
|
||||||
|
|
||||||
|
The request `state` is updated to reflect the connection state after the
|
||||||
|
function returns. The following list describes states related to
|
||||||
|
connection failure (False returned):
|
||||||
|
|
||||||
|
- STATE_INVALID_URL: URL is not valid.
|
||||||
|
- STATE_CONNECTION_FAILED: connection failed, either TCP timeout or
|
||||||
|
local TLS failure. Additionally, the request `error` attribute is set
|
||||||
|
to an error string describing the issue.
|
||||||
|
|
||||||
|
For all request states from now on, the `cert_validation` attribute is
|
||||||
|
updated with the result of the certificate validation.
|
||||||
|
|
||||||
|
The following list describes states related to validation failure (False
|
||||||
|
returned):
|
||||||
|
|
||||||
|
- STATE_ERROR_CERT: server certificate could not be validated at all.
|
||||||
|
- STATE_UNTRUSTED_CERT: server certificate mismatched the known
|
||||||
|
certificate for that hostname. The user should be presented with
|
||||||
|
options to solve the matter.
|
||||||
|
|
||||||
|
For other states, the connection is not aborted (True returned):
|
||||||
|
|
||||||
|
- STATE_INVALID_CERT: the certificate has one or more issues, e.g.
|
||||||
|
mismatching hostname or it is expired.
|
||||||
|
- STATE_UNKNOWN_CERT: the certificate is valid but unknown.
|
||||||
|
- STATE_OK: the certificate is valid and matches the known certificate
|
||||||
|
of that hostname.
|
||||||
|
|
||||||
|
After this function returns, the request state cannot be STATE_INIT.
|
||||||
|
|
||||||
|
Additional notes:
|
||||||
|
|
||||||
|
- The DER hash is compared against the fingerprint for this hostname
|
||||||
|
*and port*; the specification does not tell much about that, but we
|
||||||
|
are slightly more restrictive here by adding the port in the equation.
|
||||||
|
- The state STATE_INVALID_CERT is actually never used in Bebop because
|
||||||
|
of the current tendency to ignore any certificate fields and only
|
||||||
|
check the whole cert fingerprint. Here it is considered the same as a
|
||||||
|
valid certificate.
|
||||||
"""
|
"""
|
||||||
url_parts = parse_gemini_url(self.url)
|
url_parts = GEMINI_URL_RE.match(self.url)
|
||||||
if not url_parts:
|
if not url_parts:
|
||||||
self.state = Request.STATE_INVALID_URL
|
self.state = Request.STATE_INVALID_URL
|
||||||
return False
|
return False
|
||||||
hostname = url_parts["host"]
|
hostname = url_parts.groupdict()["host"]
|
||||||
if ":" in hostname:
|
if ":" in hostname:
|
||||||
hostname, port = hostname.split(":", maxsplit=1)
|
hostname, port = hostname.split(":", maxsplit=1)
|
||||||
try:
|
try:
|
||||||
|
@ -86,6 +134,7 @@ class Request:
|
||||||
return False
|
return False
|
||||||
else:
|
else:
|
||||||
port = 1965
|
port = 1965
|
||||||
|
self.hostname = hostname
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.payload = self.url.encode()
|
self.payload = self.url.encode()
|
||||||
|
@ -105,27 +154,26 @@ class Request:
|
||||||
try:
|
try:
|
||||||
self.ssock = context.wrap_socket(sock, server_hostname=hostname)
|
self.ssock = context.wrap_socket(sock, server_hostname=hostname)
|
||||||
except OSError as exc:
|
except OSError as exc:
|
||||||
|
sock.close()
|
||||||
self.state = Request.STATE_CONNECTION_FAILED
|
self.state = Request.STATE_CONNECTION_FAILED
|
||||||
self.error = exc.strerror
|
self.error = exc.strerror
|
||||||
return False
|
return False
|
||||||
|
|
||||||
der = self.ssock.getpeercert(binary_form=True)
|
der = self.ssock.getpeercert(binary_form=True)
|
||||||
self.cert_status, self.cert = \
|
self.cert_validation = validate_cert(der, hostname, self.cert_stash)
|
||||||
validate_cert(der, hostname, self.cert_stash)
|
cert_status = self.cert_validation["status"]
|
||||||
if self.cert_status == CertStatus.ERROR:
|
if cert_status == CertStatus.ERROR:
|
||||||
self.abort()
|
self.abort()
|
||||||
self.state = Request.STATE_ERROR_CERT
|
self.state = Request.STATE_ERROR_CERT
|
||||||
return False
|
return False
|
||||||
if self.cert_status == CertStatus.WRONG_FINGERPRINT:
|
if cert_status == CertStatus.WRONG_FINGERPRINT:
|
||||||
self.abort()
|
self.abort()
|
||||||
self.state = Request.STATE_UNTRUSTED_CERT
|
self.state = Request.STATE_UNTRUSTED_CERT
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if self.cert_status in CERT_STATUS_INVALID:
|
if cert_status == CertStatus.VALID_NEW:
|
||||||
self.state = Request.STATE_INVALID_CERT
|
|
||||||
elif self.cert_status == CertStatus.VALID_NEW:
|
|
||||||
self.state = Request.STATE_UNKNOWN_CERT
|
self.state = Request.STATE_UNKNOWN_CERT
|
||||||
else: # self.cert_status == CertStatus.VALID
|
else: # self.cert_status in (VALID, VALID_NEW, INVALID_CERT)
|
||||||
self.state = Request.STATE_OK
|
self.state = Request.STATE_OK
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
@ -232,6 +280,6 @@ class Response:
|
||||||
return response
|
return response
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_generic_code(code) -> int:
|
def get_generic_code(code: int) -> int:
|
||||||
"""Return the generic version (x0) of this code."""
|
"""Return the generic version (x0) of this code."""
|
||||||
return code - (code % 10)
|
return code - (code % 10)
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from ..rendering import _explode_words, _find_next_sep, wrap_words
|
from ..metalines import _explode_words, _find_next_sep, wrap_words
|
||||||
|
|
||||||
|
|
||||||
class TestRenderer(unittest.TestCase):
|
class TestMetalines(unittest.TestCase):
|
||||||
|
|
||||||
def test_wrap_words(self):
|
def test_wrap_words(self):
|
||||||
t = "wrap me wrap me youcantwrapthisonewithoutforce bla bla bla bla"
|
t = "wrap me wrap me youcantwrapthisonewithoutforce bla bla bla bla"
|
|
@ -1,10 +0,0 @@
|
||||||
import unittest
|
|
||||||
|
|
||||||
from ..protocol import parse_gemini_url
|
|
||||||
|
|
||||||
|
|
||||||
class TestGemini(unittest.TestCase):
|
|
||||||
|
|
||||||
def test_parse_url(self):
|
|
||||||
r1 = parse_gemini_url("gemini://dece.space")
|
|
||||||
self.assertDictEqual(r1, {"host": "dece.space", "path": ""})
|
|
151
bebop/tofu.py
151
bebop/tofu.py
|
@ -4,19 +4,70 @@ As of writing there is still some debate around it, so it is quite messy and
|
||||||
requires more clarity both in specification and in our own implementation.
|
requires more clarity both in specification and in our own implementation.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import datetime
|
|
||||||
import hashlib
|
import hashlib
|
||||||
import re
|
import re
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from typing import Any, Dict, Optional
|
||||||
|
|
||||||
import asn1crypto.x509
|
from bebop.fs import get_user_data_path
|
||||||
|
|
||||||
|
|
||||||
STASH_LINE_RE = re.compile(r"(\S+) (\S+) (\S+) (\d+)")
|
STASH_LINE_RE = re.compile(r"(\S+) (\S+) (\S+)")
|
||||||
|
|
||||||
|
WRONG_FP_ALERT = """\
|
||||||
|
The request could not complete because the certificate presented by the server \
|
||||||
|
does not match the certificate stored in the local stash.
|
||||||
|
|
||||||
|
``` details of the fingerprint mismatch
|
||||||
|
Hostname: {hostname}
|
||||||
|
Local fingerprint: {local_fp}
|
||||||
|
Server fingerprint: {remote_fp}
|
||||||
|
```
|
||||||
|
|
||||||
|
If you are sure this new certificate can be trusted, press ":" and type the \
|
||||||
|
following command to remove the previous certificate from the local stash, \
|
||||||
|
then retry your request:
|
||||||
|
|
||||||
|
``` command to use to forget about the previous certificate
|
||||||
|
forget-certificate {hostname}
|
||||||
|
```
|
||||||
|
|
||||||
|
You can also manually remove the certificate line from the known hosts file in \
|
||||||
|
your user data directory.
|
||||||
|
|
||||||
|
## FAQ
|
||||||
|
|
||||||
|
### What is this mismatch about?
|
||||||
|
|
||||||
|
Gemini uses TOFU (Trust On First Use) to verify the identity of the server you \
|
||||||
|
are visiting. It means that the first time you visited this capsule, it showed \
|
||||||
|
you its unique ID, but this time the ID is different, so the trust is broken.
|
||||||
|
|
||||||
|
Capsule owners often tell in advance when they are about the use a new \
|
||||||
|
certificate, but they may have forgotten or you may have missed it. Maybe the \
|
||||||
|
old certificate expired and/or has been replaced for another reason (e.g. \
|
||||||
|
using a far away expiration time, borking certificates during a migration, …)
|
||||||
|
|
||||||
|
### Am I being hacked?
|
||||||
|
|
||||||
|
Probably not, but if you are visiting a sensitive capsule, make sure you're \
|
||||||
|
confident enough before trusting this new certificate.
|
||||||
|
|
||||||
|
### How to ensure this new certificate can be trusted?
|
||||||
|
|
||||||
|
Can you join the owner through mail or instant messaging? This is the simplest \
|
||||||
|
way for you to make sure that the server is fine, and maybe alert the owner on \
|
||||||
|
a problem on his server she did not notice.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
def load_cert_stash(stash_path: Path):
|
def get_cert_stash_path() -> Path:
|
||||||
|
"""Return the default certificate stash path."""
|
||||||
|
return get_user_data_path() / "known_hosts.txt"
|
||||||
|
|
||||||
|
|
||||||
|
def load_cert_stash(stash_path: Path) -> Optional[Dict]:
|
||||||
"""Load the certificate stash from the file, or None on error.
|
"""Load the certificate stash from the file, or None on error.
|
||||||
|
|
||||||
The stash is a dict with host names as keys and tuples as values. Tuples
|
The stash is a dict with host names as keys and tuples as values. Tuples
|
||||||
|
@ -36,8 +87,8 @@ def load_cert_stash(stash_path: Path):
|
||||||
match = STASH_LINE_RE.match(line)
|
match = STASH_LINE_RE.match(line)
|
||||||
if not match:
|
if not match:
|
||||||
continue
|
continue
|
||||||
name, algo, fingerprint, timestamp = match.groups()
|
name, algo, fingerprint = match.groups()
|
||||||
stash[name] = (algo, fingerprint, timestamp, True)
|
stash[name] = (algo, fingerprint, True)
|
||||||
except (OSError, ValueError):
|
except (OSError, ValueError):
|
||||||
return None
|
return None
|
||||||
return stash
|
return stash
|
||||||
|
@ -47,71 +98,67 @@ def save_cert_stash(stash: dict, stash_path: Path):
|
||||||
"""Save the certificate stash."""
|
"""Save the certificate stash."""
|
||||||
try:
|
try:
|
||||||
with open(stash_path, "wt") as stash_file:
|
with open(stash_path, "wt") as stash_file:
|
||||||
for name, entry in stash.values():
|
for name, entry in stash.items():
|
||||||
algo, fingerprint, timestamp, is_permanent = entry
|
algo, fingerprint, is_permanent = entry
|
||||||
if not is_permanent:
|
if not is_permanent:
|
||||||
continue
|
continue
|
||||||
entry_line = f"{name} {algo} {fingerprint} {timestamp}\n"
|
entry_line = f"{name} {algo} {fingerprint}\n"
|
||||||
stash_file.write(entry_line)
|
stash_file.write(entry_line)
|
||||||
except (OSError, ValueError):
|
except (OSError, ValueError) as exc:
|
||||||
pass
|
print(f"Failed to save certificate stash '{stash_path}': {exc}")
|
||||||
|
|
||||||
|
|
||||||
class CertStatus(Enum):
|
class CertStatus(Enum):
|
||||||
"""Value returned by validate_cert."""
|
"""Value returned by validate_cert."""
|
||||||
# Cert is valid: proceed.
|
# Cert is valid: proceed.
|
||||||
VALID = 0 # Known and valid.
|
VALID = 0 # Known and valid.
|
||||||
VALID_NEW = 7 # New and valid.
|
VALID_NEW = 1 # New and valid.
|
||||||
# Cert is unusable or wrong: abort.
|
# Cert is unusable or wrong: abort.
|
||||||
ERROR = 1 # General error.
|
ERROR = 2 # General error.
|
||||||
WRONG_FINGERPRINT = 2 # Fingerprint in the stash is different.
|
WRONG_FINGERPRINT = 3 # Fingerprint in the stash is different.
|
||||||
# Cert has some issues: ask to proceed.
|
|
||||||
NOT_VALID_YET = 3 # not-before date invalid.
|
|
||||||
EXPIRED = 4 # not-after date invalid.
|
|
||||||
BAD_DOMAIN = 5 # Host name is not in cert's valid domains.
|
|
||||||
|
|
||||||
|
|
||||||
CERT_STATUS_INVALID = (
|
def validate_cert(der, hostname, cert_stash) -> Dict[str, Any]:
|
||||||
CertStatus.NOT_VALID_YET,
|
"""Return a dict containing validation info for this certificate.
|
||||||
CertStatus.EXPIRED,
|
|
||||||
CertStatus.BAD_DOMAIN,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
Returns:
|
||||||
def validate_cert(der, hostname, cert_stash):
|
The validation dict can contain two keys:
|
||||||
"""Return a tuple (CertStatus, Certificate) for this certificate."""
|
- status: CertStatus, always present.
|
||||||
|
- hash: DER hash to be used as certificate fingerprint, present if status is
|
||||||
|
not CertStatus.ERROR.
|
||||||
|
- saved_hash: fingerprint for this hostname in the local stash, present if
|
||||||
|
status is CertStatus.WRONG_FINGERPRINT.
|
||||||
|
"""
|
||||||
if der is None:
|
if der is None:
|
||||||
return CertStatus.ERROR, None
|
return {"status": CertStatus.ERROR}
|
||||||
try:
|
|
||||||
cert = asn1crypto.x509.Certificate.load(der)
|
|
||||||
except ValueError:
|
|
||||||
return CertStatus.ERROR, None
|
|
||||||
|
|
||||||
# Check for sane parameters.
|
known = False
|
||||||
now = datetime.datetime.now(tz=datetime.timezone.utc)
|
|
||||||
if now < cert.not_valid_before:
|
|
||||||
return CertStatus.NOT_VALID_YET, cert
|
|
||||||
if now > cert.not_valid_after:
|
|
||||||
return CertStatus.EXPIRED, cert
|
|
||||||
if hostname not in cert.valid_domains:
|
|
||||||
return CertStatus.BAD_DOMAIN, cert
|
|
||||||
|
|
||||||
# Check the entire certificate fingerprint.
|
# Check the entire certificate fingerprint.
|
||||||
cert_hash = hashlib.sha512(der).hexdigest()
|
cert_hash = hashlib.sha512(der).hexdigest()
|
||||||
|
result = {"hash": cert_hash} # type: Dict[str, Any]
|
||||||
if hostname in cert_stash:
|
if hostname in cert_stash:
|
||||||
_, fingerprint, timestamp, _ = cert_stash[hostname]
|
_, fingerprint, _ = cert_stash[hostname]
|
||||||
if timestamp >= now.timestamp():
|
if cert_hash != fingerprint:
|
||||||
if cert_hash != fingerprint:
|
result.update(
|
||||||
return CertStatus.WRONG_FINGERPRINT, cert
|
status=CertStatus.WRONG_FINGERPRINT,
|
||||||
else:
|
saved_hash=fingerprint
|
||||||
# Disregard expired fingerprints.
|
)
|
||||||
pass
|
return result
|
||||||
return CertStatus.VALID, cert
|
known = True
|
||||||
|
|
||||||
# The certificate is unknown and valid.
|
result.update(status=CertStatus.VALID if known else CertStatus.VALID_NEW)
|
||||||
return CertStatus.VALID_NEW, cert
|
return result
|
||||||
|
|
||||||
|
|
||||||
def trust(cert_stash, hostname, algo, fingerprint, timestamp,
|
def trust_fingerprint(stash, hostname, algo, fingerprint, trust_always=False):
|
||||||
trust_always=False):
|
"""Add a fingerprint entry to this stash."""
|
||||||
cert_stash[hostname] = (algo, fingerprint, timestamp, trust_always)
|
stash[hostname] = (algo, fingerprint, trust_always)
|
||||||
|
|
||||||
|
|
||||||
|
def untrust_fingerprint(stash, hostname):
|
||||||
|
"""Remove a fingerprint entry from this stash; return True on deletion."""
|
||||||
|
if hostname in stash:
|
||||||
|
del stash[hostname]
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
|
@ -1 +0,0 @@
|
||||||
asn1crypto
|
|
Reference in a new issue