Compare commits

..

6 commits

Author SHA1 Message Date
dece 62997b9385 help: display current config 2021-05-13 01:26:50 +02:00
dece f63d875fc3 logging 2021-05-13 01:25:50 +02:00
dece 8b1561e689 identity: present cert instead of waiting for 60 2021-05-13 01:24:29 +02:00
dece 57f01720d6 identity: add basic identity management 2021-05-12 22:34:18 +02:00
dece 6ceb75b84c config: add download_path 2021-05-09 23:02:56 +02:00
dece 056616b130 page_pad: fix crashes on some fail refresh
would have been better to understand why it crashed but yolo
2021-05-09 15:55:27 +02:00
16 changed files with 491 additions and 82 deletions

View file

@ -17,12 +17,22 @@ TODO DONE
view history view history
open last download open last download
media files media files
identity management
logging
home page home page
identity management
--------------------------------------------------------------------------------
BACKLOG BACKLOG
--------------------------------------------------------------------------------
bug: can't input unicode
click on links to open them click on links to open them
download to disk, not in memory bug: can't reload bebop: pages
bug: exiting editor breaks curses
dumb rendering mode per site
disable cache per site
well, preferences per site maybe?
download without memory buffer
download in the background download in the background
download view instead of last download download view instead of last download
does encoding really work? cf. egsam does encoding really work? cf. egsam
@ -35,5 +45,7 @@ bug: combining chars reduce lengths
non shit command-line non shit command-line
response code 11 (if still there) response code 11 (if still there)
gopher? gopher?
save history opt. maintain history between sessions
history (forward) (useful?) history (forward) (useful?)
search in page (ugh)
get/set config using command-line

View file

@ -77,6 +77,7 @@ Here are the available options:
|----------------------------|--------------|----------------|---------------------------------------| |----------------------------|--------------|----------------|---------------------------------------|
| `connect_timeout` | int | 10 | Seconds before connection times out. | | `connect_timeout` | int | 10 | Seconds before connection times out. |
| `text_width` | int | 80 | Rendered line length. | | `text_width` | int | 80 | Rendered line length. |
| `download_path` | string | | Download path. |
| `source_editor` | string list | `["vi"]` | Command to use for editing sources. | | `source_editor` | string list | `["vi"]` | Command to use for editing sources. |
| `command_editor` | string list | `["vi"]` | Command to use for editing CLI input. | | `command_editor` | string list | `["vi"]` | Command to use for editing CLI input. |
| `history_limit` | int | 1000 | Maximum entries in history. | | `history_limit` | int | 1000 | Maximum entries in history. |

View file

@ -1,4 +1,5 @@
import argparse import argparse
import logging
from bebop.browser.browser import Browser from bebop.browser.browser import Browser
from bebop.config import load_config from bebop.config import load_config
@ -9,8 +10,18 @@ from bebop.tofu import get_cert_stash_path, load_cert_stash, save_cert_stash
def main(): def main():
argparser = argparse.ArgumentParser() argparser = argparse.ArgumentParser()
argparser.add_argument("url", nargs="?", default=None) argparser.add_argument("url", nargs="?", default=None)
argparser.add_argument("-d", "--debug", action="store_true")
args = argparser.parse_args() args = argparser.parse_args()
if args.debug:
logging.basicConfig(
filename="bebop.log",
level=logging.DEBUG,
format="%(asctime)s %(levelname)-8s %(message)s"
)
else:
logging.disable()
if args.url: if args.url:
start_url = args.url start_url = args.url
else: else:
@ -19,7 +30,10 @@ def main():
config_path = get_config_path() config_path = get_config_path()
config = load_config(config_path) config = load_config(config_path)
ensure_bebop_files_exist() bebop_files_error = ensure_bebop_files_exist()
if bebop_files_error:
logging.critical(f"Failed to create files: {bebop_files_error}")
return
cert_stash_path = get_cert_stash_path() cert_stash_path = get_cert_stash_path()
cert_stash = load_cert_stash(cert_stash_path) or {} cert_stash = load_cert_stash(cert_stash_path) or {}

View file

@ -3,12 +3,13 @@
import curses import curses
import curses.ascii import curses.ascii
import curses.textpad import curses.textpad
import logging
import os import os
import subprocess import subprocess
import tempfile import tempfile
from math import inf from math import inf
from pathlib import Path from pathlib import Path
from typing import Optional, Tuple from typing import Dict, Optional, Tuple
from bebop.bookmarks import ( from bebop.bookmarks import (
get_bookmarks_path, get_bookmarks_document, save_bookmark get_bookmarks_path, get_bookmarks_document, save_bookmark
@ -16,8 +17,10 @@ from bebop.bookmarks import (
from bebop.colors import ColorPair, init_colors from bebop.colors import ColorPair, init_colors
from bebop.command_line import CommandLine from bebop.command_line import CommandLine
from bebop.external import open_external_program from bebop.external import open_external_program
from bebop.help import HELP_PAGE from bebop.fs import get_identities_list_path
from bebop.help import get_help
from bebop.history import History from bebop.history import History
from bebop.identity import load_identities
from bebop.links import Links from bebop.links import Links
from bebop.mime import MimeType from bebop.mime import MimeType
from bebop.mouse import ButtonState from bebop.mouse import ButtonState
@ -41,14 +44,15 @@ class Browser:
- command_line: a CommandLine object for the user to interact with. - command_line: a CommandLine object for the user to interact with.
- running: the browser will continue running while this is true. - running: the browser will continue running while this is true.
- status_data: 3-uple of status text, color pair and attributes of the - status_data: 3-uple of status text, color pair and attributes of the
status line, used to reset status after an error. status line, used to reset status after an error.
- history: an History object. - history: an History object.
- cache: a dict containing cached pages - cache: a dict containing cached pages.
- special_pages: a dict containing page names used with "bebop" scheme; - special_pages: a dict containing page names used with "bebop" scheme;
values are dicts as well: the "open" key maps to a callable to use when values are dicts as well: the "open" key maps to a callable to use when
the page is accessed, and the optional "source" key maps to callable the page is accessed, and the optional "source" key maps to callable
returning the page source path. returning the page source path.
- last_download: tuple of MimeType and path, or None. - last_download: tuple of MimeType and path, or None.
- identities: identities map.
""" """
def __init__(self, config, cert_stash): def __init__(self, config, cert_stash):
@ -65,6 +69,7 @@ class Browser:
self.cache = {} self.cache = {}
self.special_pages = self.setup_special_pages() self.special_pages = self.setup_special_pages()
self.last_download: Optional[Tuple[MimeType, Path]] = None self.last_download: Optional[Tuple[MimeType, Path]] = None
self.identities = load_identities(get_identities_list_path()) or {}
self._current_url = "" self._current_url = ""
@property @property
@ -86,6 +91,11 @@ class Browser:
self._current_url = url self._current_url = url
self.set_status(url) self.set_status(url)
@property
def current_scheme(self):
"""Return the scheme of the current URL."""
return parse_url(self._current_url)["scheme"] or ""
def setup_special_pages(self): def setup_special_pages(self):
"""Return a dict with the special pages functions.""" """Return a dict with the special pages functions."""
return { return {
@ -104,6 +114,7 @@ class Browser:
def run(self, *args, **kwargs): def run(self, *args, **kwargs):
"""Use curses' wrapper around _run.""" """Use curses' wrapper around _run."""
os.environ.setdefault("ESCDELAY", "25") os.environ.setdefault("ESCDELAY", "25")
logging.info("Cursing…")
curses.wrapper(self._run, *args, **kwargs) curses.wrapper(self._run, *args, **kwargs)
def _run(self, stdscr, start_url=None): def _run(self, stdscr, start_url=None):
@ -250,6 +261,7 @@ class Browser:
def refresh_status_line(self): def refresh_status_line(self):
"""Refresh status line contents.""" """Refresh status line contents."""
text, pair, attributes = self.status_data text, pair, attributes = self.status_data
logging.debug("Status: " + text)
text = text[:self.w - 1] text = text[:self.w - 1]
color = curses.color_pair(pair) color = curses.color_pair(pair)
self.status_line.addstr(0, 0, text, color | attributes) self.status_line.addstr(0, 0, text, color | attributes)
@ -295,6 +307,15 @@ class Browser:
from bebop.browser.gemini import forget_certificate from bebop.browser.gemini import forget_certificate
forget_certificate(self, words[1]) forget_certificate(self, words[1])
def get_user_text_input(self, status_text, char, prefix="", strip=False):
"""Get user input from the command-line."""
self.set_status(status_text)
result = self.command_line.focus(char, prefix=prefix)
self.reset_status()
if strip:
result = result.strip()
return result
def open_url(self, url, base_url=None, redirects=0, assume_absolute=False, def open_url(self, url, base_url=None, redirects=0, assume_absolute=False,
history=True, use_cache=True): history=True, use_cache=True):
"""Try to open an URL. """Try to open an URL.
@ -318,12 +339,14 @@ class Browser:
self.set_status_error(f"Too many redirections ({url}).") self.set_status_error(f"Too many redirections ({url}).")
return return
current_scheme = self.current_scheme or "gemini"
if assume_absolute or not self.current_url: if assume_absolute or not self.current_url:
parts = parse_url(url, absolute=True, default_scheme="gemini") parts = parse_url(url, absolute=True, default_scheme=current_scheme)
else: else:
parts = parse_url(url) parts = parse_url(url, default_scheme=current_scheme)
if parts["scheme"] is None and parts["netloc"] is None: # If there is a no netloc part, try to join the URL.
if parts["netloc"] is None and parts["scheme"] == current_scheme:
base_url = base_url or self.current_url base_url = base_url or self.current_url
if base_url: if base_url:
parts = parse_url(join_url(base_url, url)) parts = parse_url(join_url(base_url, url))
@ -331,10 +354,10 @@ class Browser:
self.set_status_error(f"Can't open '{url}'.") self.set_status_error(f"Can't open '{url}'.")
return return
# Replace URL passed as parameter by a proper absolute one. # Replace URL passed as parameter by a sanitized one.
url = unparse_url(parts) url = unparse_url(parts)
scheme = parts["scheme"] or "" scheme = parts["scheme"]
if scheme == "gemini": if scheme == "gemini":
from bebop.browser.gemini import open_gemini_url from bebop.browser.gemini import open_gemini_url
success = open_gemini_url( success = open_gemini_url(
@ -523,17 +546,15 @@ class Browser:
"""Add the current URL as bookmark.""" """Add the current URL as bookmark."""
if not self.current_url: if not self.current_url:
return return
self.set_status("Bookmark title?")
current_title = self.page_pad.current_page.title or "" current_title = self.page_pad.current_page.title or ""
title = self.command_line.focus( title = self.get_user_text_input(
"Bookmark title?",
CommandLine.CHAR_TEXT, CommandLine.CHAR_TEXT,
prefix=current_title prefix=current_title,
strip=True,
) )
if title: if title:
title = title.strip() save_bookmark(self.current_url, title)
if title:
save_bookmark(self.current_url, title)
self.reset_status()
def edit_page(self): def edit_page(self):
"""Open a text editor to edit the page source. """Open a text editor to edit the page source.
@ -572,7 +593,7 @@ class Browser:
def open_help(self): def open_help(self):
"""Show the help page.""" """Show the help page."""
self.open_internal_page("help", HELP_PAGE) self.open_internal_page("help", get_help(self.config))
def prompt(self, text, keys): def prompt(self, text, keys):
"""Display the text and allow it to type one of the given keys.""" """Display the text and allow it to type one of the given keys."""

View file

@ -1,10 +1,18 @@
"""Gemini-related features of the browser.""" """Gemini-related features of the browser."""
import logging
from pathlib import Path from pathlib import Path
from typing import Optional
from bebop.browser.browser import Browser from bebop.browser.browser import Browser
from bebop.command_line import CommandLine from bebop.command_line import CommandLine
from bebop.fs import get_downloads_path from bebop.fs import (
get_downloads_path, get_identities_path, get_identities_list_path
)
from bebop.identity import (
ClientCertificateException, create_certificate, get_cert_and_key,
get_identities_for_url, load_identities, save_identities
)
from bebop.navigation import set_parameter from bebop.navigation import set_parameter
from bebop.page import Page from bebop.page import Page
from bebop.protocol import Request, Response from bebop.protocol import Request, Response
@ -14,7 +22,13 @@ from bebop.tofu import trust_fingerprint, untrust_fingerprint, WRONG_FP_ALERT
MAX_URL_LEN = 1024 MAX_URL_LEN = 1024
def open_gemini_url(browser: Browser, url, redirects=0, use_cache=True): def open_gemini_url(
browser: Browser,
url: str,
redirects: int =0,
use_cache: bool =True,
cert_and_key=None
) -> Optional[str]:
"""Open a Gemini URL and set the formatted response as content. """Open a Gemini URL and set the formatted response as content.
While the specification is not set in stone, every client takes a slightly While the specification is not set in stone, every client takes a slightly
@ -27,7 +41,7 @@ def open_gemini_url(browser: Browser, url, redirects=0, use_cache=True):
- STATE_INVALID_CERT: the certificate has non-fatal issues; we may - STATE_INVALID_CERT: the certificate has non-fatal issues; we may
present the user the problems found and let her decide whether to trust present the user the problems found and let her decide whether to trust
temporarily the certificate or not BUT we currently do not parse the temporarily the certificate or not BUT we currently do not parse the
certificate's fields, so this state is never used. certificate's fields, not even the pubkey, so this state is never used.
- STATE_UNKNOWN_CERT: the certificate is valid but has not been seen before; - STATE_UNKNOWN_CERT: the certificate is valid but has not been seen before;
as we're doing TOFU here, we could automatically trust it or let the user as we're doing TOFU here, we could automatically trust it or let the user
choose. For simplicity, we always trust it permanently. choose. For simplicity, we always trust it permanently.
@ -37,23 +51,38 @@ def open_gemini_url(browser: Browser, url, redirects=0, use_cache=True):
- url: a valid URL with Gemini scheme to open. - url: a valid URL with Gemini scheme to open.
- redirects: current amount of redirections done to open the initial URL. - redirects: current amount of redirections done to open the initial URL.
- use_cache: if true, look up if the page is cached before requesting it. - use_cache: if true, look up if the page is cached before requesting it.
- cert_and_key: if not None, a tuple of paths to a client cert/key to use.
Returns: Returns:
True on success, False otherwise. The final successfully handled URL on success, None otherwise. Redirected
URLs are not returned.
""" """
if len(url) >= MAX_URL_LEN: if len(url) >= MAX_URL_LEN:
browser.set_status_error("Request URL too long.") browser.set_status_error("Request URL too long.")
return return None
browser.set_status(f"Loading {url}") loading_message_verb = "Loading" if redirects == 0 else "Redirecting to"
loading_message = f"{loading_message_verb} {url}"
browser.set_status(loading_message)
# If this URL used to request an identity, provide it.
if not cert_and_key:
url_identities = get_identities_for_url(browser.identities, url)
identity = select_identity(url_identities)
if identity:
cert_and_key = get_cert_and_key(identity["id"])
if use_cache and url in browser.cache: if use_cache and url in browser.cache:
browser.load_page(browser.cache[url]) browser.load_page(browser.cache[url])
browser.current_url = url browser.current_url = url
browser.set_status(url) browser.set_status(url)
return True return url
req = Request(url, browser.stash) logging.info(
f"Request {url}"
+ (f" using cert and key {cert_and_key}" if cert_and_key else "")
)
req = Request(url, browser.stash, identity=cert_and_key)
connect_timeout = browser.config["connect_timeout"] connect_timeout = browser.config["connect_timeout"]
connected = req.connect(connect_timeout) connected = req.connect(connect_timeout)
if not connected: if not connected:
@ -68,7 +97,7 @@ def open_gemini_url(browser: Browser, url, redirects=0, use_cache=True):
else: else:
error = f"Connection failed ({url})." error = f"Connection failed ({url})."
browser.set_status_error(error) browser.set_status_error(error)
return False return None
if req.state == Request.STATE_INVALID_CERT: if req.state == Request.STATE_INVALID_CERT:
pass pass
@ -87,11 +116,11 @@ def open_gemini_url(browser: Browser, url, redirects=0, use_cache=True):
data = req.proceed() data = req.proceed()
if not data: if not data:
browser.set_status_error(f"Server did not respond in time ({url}).") browser.set_status_error(f"Server did not respond in time ({url}).")
return False return None
response = Response.parse(data) response = Response.parse(data)
if not response: if not response:
browser.set_status_error(f"Server response parsing failed ({url}).") browser.set_status_error(f"Server response parsing failed ({url}).")
return False return None
return _handle_response(browser, response, url, redirects) return _handle_response(browser, response, url, redirects)
@ -116,26 +145,43 @@ def _handle_untrusted_cert(browser: Browser, request: Request):
browser.load_page(alert_page) browser.load_page(alert_page)
def _handle_response(browser: Browser, response: Response, url: str, def _handle_response(
redirects: int): browser: Browser,
response: Response,
url: str,
redirects: int
) -> Optional[str]:
"""Handle a response from a Gemini server. """Handle a response from a Gemini server.
Returns: Returns:
True on success, False otherwise. The final URL on success, None otherwise.
""" """
logging.info(f"Response {response.code} {response.meta}")
if response.code == 20: if response.code == 20:
return _handle_successful_response(browser, response, url) return _handle_successful_response(browser, response, url)
elif response.generic_code == 30 and response.meta: elif response.generic_code == 30 and response.meta:
browser.open_url(response.meta, base_url=url, redirects=redirects + 1) # On redirections, we go back to open_url as the redirection may be to
# another protocol. Discard the result of this request.
browser.open_url(
response.meta,
base_url=url,
redirects=redirects + 1
)
elif response.generic_code in (40, 50): elif response.generic_code in (40, 50):
error = f"Server error: {response.meta or Response.code.name}" error = f"Server error: {response.meta or Response.code.name}"
browser.set_status_error(error) browser.set_status_error(error)
elif response.generic_code == 10: elif response.generic_code == 10:
_handle_input_request(browser, url, response.meta) return _handle_input_request(browser, url, response.meta)
elif response.code == 60:
return _handle_cert_required(browser, response, url, redirects)
elif response.code in (61, 62):
details = response.meta or Response.code.name
error = f"Client certificate error: {details}"
browser.set_status_error(error)
else: else:
error = f"Unhandled response code {response.code}" error = f"Unhandled response code {response.code}"
browser.set_status_error(error) browser.set_status_error(error)
return False return None
def _handle_successful_response(browser: Browser, response: Response, url: str): def _handle_successful_response(browser: Browser, response: Response, url: str):
@ -155,7 +201,7 @@ def _handle_successful_response(browser: Browser, response: Response, url: str):
- response: a successful Response. - response: a successful Response.
Returns: Returns:
True on success, False otherwise. The successfully handled URL on success, None otherwise.
""" """
# Use appropriate response parser according to the MIME type. # Use appropriate response parser according to the MIME type.
mime_type = response.get_mime_type() mime_type = response.get_mime_type()
@ -175,7 +221,8 @@ def _handle_successful_response(browser: Browser, response: Response, url: str):
text = response.content.decode("utf-8", errors="replace") text = response.content.decode("utf-8", errors="replace")
page = Page.from_text(text) page = Page.from_text(text)
else: else:
filepath = _get_download_path(url) download_dir = browser.config["download_path"]
filepath = _get_download_path(url, download_dir=download_dir)
# If a page has been produced, load it. Else if a file has been retrieved, # If a page has been produced, load it. Else if a file has been retrieved,
# download it. # download it.
@ -184,7 +231,7 @@ def _handle_successful_response(browser: Browser, response: Response, url: str):
browser.current_url = url browser.current_url = url
browser.cache[url] = page browser.cache[url] = page
browser.set_status(url) browser.set_status(url)
return True return url
elif filepath: elif filepath:
try: try:
with open(filepath, "wb") as download_file: with open(filepath, "wb") as download_file:
@ -194,26 +241,36 @@ def _handle_successful_response(browser: Browser, response: Response, url: str):
else: else:
browser.set_status(f"Downloaded {url} ({mime_type.short}).") browser.set_status(f"Downloaded {url} ({mime_type.short}).")
browser.last_download = mime_type, filepath browser.last_download = mime_type, filepath
return True return url
elif error: elif error:
browser.set_status_error(error) browser.set_status_error(error)
return False return None
def _get_download_path(url: str) -> Path: def _get_download_path(url: str, download_dir: Optional[str] =None) -> Path:
"""Try to find the best download file path possible from this URL.""" """Try to find the best download file path possible from this URL."""
download_dir = get_downloads_path() download_path = Path(download_dir) if download_dir else get_downloads_path()
if not download_path.exists():
download_path.mkdir(parents=True)
url_parts = url.rsplit("/", maxsplit=1) url_parts = url.rsplit("/", maxsplit=1)
if url_parts: if url_parts:
filename = url_parts[-1] filename = url_parts[-1]
else: else:
filename = url.split("://")[1] if "://" in url else url filename = url.split("://")[1] if "://" in url else url
filename = filename.replace("/", "_") filename = filename.replace("/", "_")
return download_dir / filename return download_path / filename
def _handle_input_request(browser: Browser, from_url: str, message: str =None): def _handle_input_request(
"""Focus command-line to pass input to the server.""" browser: Browser,
from_url: str,
message: str =None
) -> Optional[str]:
"""Focus command-line to pass input to the server.
Returns:
The result of `open_gemini_url` with the new request including user input.
"""
if message: if message:
browser.set_status(f"Input needed: {message}") browser.set_status(f"Input needed: {message}")
else: else:
@ -222,12 +279,86 @@ def _handle_input_request(browser: Browser, from_url: str, message: str =None):
if not user_input: if not user_input:
return return
url = set_parameter(from_url, user_input) url = set_parameter(from_url, user_input)
open_gemini_url(browser, url) return open_gemini_url(browser, url)
def _handle_cert_required(
browser: Browser,
response: Response,
url: str,
redirects: int
) -> Optional[str]:
"""Find a matching identity and resend the request with it.
Returns:
The result of `open_gemini_url` with the client certificate provided.
"""
identities = load_identities(get_identities_list_path())
if not identities:
browser.set_status_error(f"Can't load identities.")
return None
browser.identities = identities
url_identities = get_identities_for_url(browser.identities, url)
if not url_identities:
identity = create_identity(browser, url)
if not identity:
return None
browser.identities[url] = [identity]
save_identities(browser.identities, get_identities_list_path())
else:
identity = select_identity(url_identities)
cert_path, key_path = get_cert_and_key(identity["id"])
return open_gemini_url(
browser,
url,
redirects=redirects + 1,
use_cache=False,
cert_and_key=(cert_path, key_path)
)
def select_identity(identities: list):
"""Let user select the appropriate identity among candidates."""
# TODO support multiple identities; for now we just use the first available.
return identities[0] if identities else None
def create_identity(browser: Browser, url: str):
"""Walk the user through identity creation.
Returns:
The created identity on success (already registered in identities
"""
key = browser.prompt("Create client certificate? [y/n]", "yn")
if key != "y":
browser.reset_status()
return None
common_name = browser.get_user_text_input(
"Name? The server will see this, you can leave it empty.",
CommandLine.CHAR_TEXT,
strip=True,
)
if not common_name:
browser.reset_status()
return None
browser.set_status("Generating certificate…")
try:
mangled_name = create_certificate(url, common_name)
except ClientCertificateException as exc:
browser.set_status_error(exc.message)
return None
browser.reset_status()
return {"name": common_name, "id": mangled_name}
def forget_certificate(browser: Browser, hostname: str): def forget_certificate(browser: Browser, hostname: str):
"""Remove the fingerprint associated to this hostname for the cert stash.""" """Remove the fingerprint associated to this hostname for the cert stash."""
key = browser.prompt(f"Remove fingerprint from {hostname}? [y/N]", "ynN") key = browser.prompt(f"Remove fingerprint for {hostname}? [y/n]", "yn")
if key != "y": if key != "y":
browser.reset_status() browser.reset_status()
return return

View file

@ -5,6 +5,7 @@ import curses.ascii
import curses.textpad import curses.textpad
import os import os
import tempfile import tempfile
from typing import Optional
from bebop.external import open_external_program from bebop.external import open_external_program
from bebop.links import Links from bebop.links import Links
@ -38,7 +39,7 @@ class CommandLine:
self.window.clear() self.window.clear()
self.window.refresh() self.window.refresh()
def gather(self): def gather(self) -> str:
"""Return the string currently written by the user in command line. """Return the string currently written by the user in command line.
This doesn't count the command char used, but it includes then prefix. This doesn't count the command char used, but it includes then prefix.
@ -46,7 +47,13 @@ class CommandLine:
""" """
return self.textbox.gather()[1:].rstrip() return self.textbox.gather()[1:].rstrip()
def focus(self, command_char, validator=None, prefix=""): def focus(
self,
command_char,
validator=None,
prefix="",
escape_to_none=False
) -> Optional[str]:
"""Give user focus to the command bar. """Give user focus to the command bar.
Show the command char and give focus to the command textbox. The Show the command char and give focus to the command textbox. The
@ -58,10 +65,12 @@ class CommandLine:
- validator: function to use to validate the input chars; if omitted, - validator: function to use to validate the input chars; if omitted,
`validate_common_input` is used. `validate_common_input` is used.
- prefix: string to insert before the cursor in the command line. - prefix: string to insert before the cursor in the command line.
- escape_to_none: if True, an escape interruption returns None instead
of an empty string.
Returns: Returns:
User input as string. The string will be empty if the validator raised User input as string. The string will be empty if the validator raised
an EscapeInterrupt. an EscapeInterrupt, unless `escape_to_none` is True.
""" """
validator = validator or self._validate_common_input validator = validator or self._validate_common_input
self.window.clear() self.window.clear()
@ -71,7 +80,7 @@ class CommandLine:
try: try:
command = self.textbox.edit(validator) command = self.textbox.edit(validator)
except EscapeCommandInterrupt: except EscapeCommandInterrupt:
command = "" command = "" if not escape_to_none else None
except TerminateCommandInterrupt as exc: except TerminateCommandInterrupt as exc:
command = exc.command command = exc.command
else: else:

View file

@ -1,12 +1,14 @@
"""Config management.""" """Config management."""
import json import json
import logging
import os.path import os.path
DEFAULT_CONFIG = { DEFAULT_CONFIG = {
"connect_timeout": 10, "connect_timeout": 10,
"text_width": 80, "text_width": 80,
"download_path": "",
"source_editor": ["vi"], "source_editor": ["vi"],
"command_editor": ["vi"], "command_editor": ["vi"],
"history_limit": 1000, "history_limit": 1000,
@ -24,9 +26,9 @@ def load_config(config_path):
with open(config_path, "rt") as config_file: with open(config_path, "rt") as config_file:
config = json.load(config_file) config = json.load(config_file)
except OSError as exc: except OSError as exc:
print(f"Could not read config file {config_path}: {exc}") logging.error(f"Could not read config file {config_path}: {exc}")
except ValueError as exc: except ValueError as exc:
print(f"Could not parse config file {config_path}: {exc}") logging.error(f"Could not parse config file {config_path}: {exc}")
else: else:
# Fill missing values with defaults. # Fill missing values with defaults.
for key, value in DEFAULT_CONFIG.items(): for key, value in DEFAULT_CONFIG.items():
@ -41,4 +43,4 @@ def create_default_config(config_path):
with open(config_path, "wt") as config_file: with open(config_path, "wt") as config_file:
json.dump(DEFAULT_CONFIG, config_file, indent=2) json.dump(DEFAULT_CONFIG, config_file, indent=2)
except OSError as exc: except OSError as exc:
print(f"Could not create config file {config_path}: {exc}") logging.error(f"Could not create config file {config_path}: {exc}")

View file

@ -8,6 +8,7 @@ from functools import lru_cache
from os import getenv from os import getenv
from os.path import expanduser from os.path import expanduser
from pathlib import Path from pathlib import Path
from typing import Optional
APP_NAME = "bebop" APP_NAME = "bebop"
@ -29,7 +30,7 @@ def get_user_data_path() -> Path:
@lru_cache(None) @lru_cache(None)
def get_downloads_path() -> Path: def get_downloads_path() -> Path:
"""Return the user downloads directory path.""" """Return the user downloads directory path (fallbacks to home dir)."""
xdg_config_path = Path(getenv("XDG_CONFIG_HOME", expanduser("~/.config"))) xdg_config_path = Path(getenv("XDG_CONFIG_HOME", expanduser("~/.config")))
download_path = "" download_path = ""
try: try:
@ -47,9 +48,36 @@ def get_downloads_path() -> Path:
return Path.home() return Path.home()
def ensure_bebop_files_exist(): @lru_cache(None)
"""Ensure various Bebop's files or directories are present.""" def get_identities_list_path():
# Ensure the user data directory exists. """Return the identities JSON file path."""
user_data_path = get_user_data_path() return get_user_data_path() / "identities.json"
if not user_data_path.exists():
user_data_path.mkdir(parents=True)
@lru_cache(None)
def get_identities_path():
"""Return the directory where identities are stored."""
return get_user_data_path() / "identities"
def ensure_bebop_files_exist() -> Optional[str]:
"""Ensure various Bebop's files or directories are present.
Returns:
None if all files and directories are present, an error string otherwise.
"""
try:
# Ensure the user data directory exists.
user_data_path = get_user_data_path()
if not user_data_path.exists():
user_data_path.mkdir(parents=True)
# Ensure the identities file and directory exists.
identities_file_path = get_identities_list_path()
if not identities_file_path.exists():
with open(identities_file_path, "wt") as identities_file:
identities_file.write("{}")
identities_path = get_identities_path()
if not identities_path.exists():
identities_path.mkdir(parents=True)
except OSError as exc:
return str(exc)

View file

@ -46,4 +46,18 @@ with arguments by pressing the corresponding keybind above.
* o/open <url>: open this URL * o/open <url>: open this URL
* forget_certificate <hostname>: remove saved fingerprint for the hostname * forget_certificate <hostname>: remove saved fingerprint for the hostname
* q/quit: well, quit * q/quit: well, quit
## Configuration
The current configuration is:
{config_list}
""" """
def get_help(config):
config_list = "\n".join(
f"* {key} = {value}"
for key, value in config.items()
)
return HELP_PAGE.format(config_list=config_list)

131
bebop/identity.py Normal file
View file

@ -0,0 +1,131 @@
"""Identity management, i.e. client certificates.
Identities are created when a server requests them for the first time, and saved
with the corresponding URL. The certificate is automatically presented when the
URL is revisited, and all "children" URLs.
Identities are stored on disk as pairs of certificates/keys. URLs are stored in
an identity file, `identities.json`, a simple URL dict that can be looked up for
identities to use, mapped to an ID to identify the cert/key files.
The identity file and the identities dict both have the following format:
``` json
{
"gemini://example.com/app": [
{
"name": "test",
"id": "geminiexamplecomapp-test",
}
]
}
```
"""
import hashlib
import json
import logging
import secrets
import string
import subprocess
from pathlib import Path
from typing import Optional, Union
from bebop.fs import get_identities_path, get_user_data_path
def load_identities(identities_path: Path) -> Optional[dict]:
"""Return saved identities or None on error."""
identities = {}
try:
with open(identities_path, "rt") as identities_file:
identities = json.load(identities_file)
except (OSError, ValueError) as exc:
logging.error(f"Failed to load identities '{identities_path}': {exc}")
return None
return identities
def save_identities(identities: dict, identities_path: Path):
"""Save the certificate stash. Return True on success."""
try:
with open(identities_path, "wt") as identities_file:
json.dump(identities, identities_file)
except (OSError, ValueError) as exc:
logging.error(f"Failed to save identities '{identities_path}': {exc}")
return False
return True
class ClientCertificateException(Exception):
def __init__(self, message: str) -> None:
super().__init__()
self.message = message
def get_identities_for_url(identities: dict, url: str) -> list:
"""For a given URL, return all its identities.
If several URLs are prefixes of the given URL, e.g. we look up
"gemini://host/app/sub" and there are identities for both
"gemini://host/app" and "gemini://host/app/sub", the longest URL's
identities are returned (here the latter).
"""
candidates = [key for key in identities if url.startswith(key)]
if not candidates:
return []
return identities[max(candidates, key=len)]
def get_cert_and_key(cert_id: str):
"""Return the paths of the certificate and key file for this ID."""
directory = get_identities_path()
return directory / f"{cert_id}.crt", directory / f"{cert_id}.key"
def create_certificate(url: str, common_name: str):
"""Create a secure self-signed certificate using system's OpenSSL."""
identities_path = get_identities_path()
mangled_name = get_mangled_name(url, common_name)
cert_path = identities_path / f"{mangled_name}.crt"
key_path = identities_path / f"{mangled_name}.key"
command = [
"openssl", "req",
"-newkey", "rsa:4096",
"-nodes",
"-keyform", "PEM",
"-keyout", str(key_path),
"-x509",
"-days", "28140", # https://www.youtube.com/watch?v=F9L4q-0Pi4E
"-outform", "PEM",
"-out", str(cert_path),
"-subj", f"/CN={common_name}",
]
try:
subprocess.check_call(
command,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
except subprocess.CalledProcessError as exc:
error = "Could not create certificate: " + str(exc)
raise ClientCertificateException(error)
return mangled_name
def get_mangled_name(url: str, common_name: str) -> str:
"""Return a mangled name for the certificate and key files.
This is not obfuscation at all. The mangling is extremely simple and is
just a way to produce names easier on the file system than full URLs.
The mangling is:
`sha256(md5(url) + "-" + common_name + "-" + 8_random_hex_digits)`
with characters that can't be UTF-8 encoded replaced by U+FFFD REPLACEMENT
CHARACTER.
"""
encoded_url = hashlib.md5(url.encode(errors="replace")).hexdigest()
random_hex = hex(secrets.randbits(32))[2:].zfill(8)
name = f"{encoded_url}-{common_name}-{random_hex}"
return hashlib.sha256(name.encode(errors="replace")).hexdigest()

View file

@ -45,11 +45,11 @@ def parse_url(
- absolute: assume the URL is absolute, e.g. in the case we are trying to - absolute: assume the URL is absolute, e.g. in the case we are trying to
parse an URL an user has written, which is most of the time an absolute parse an URL an user has written, which is most of the time an absolute
URL even if not perfectly so. This only has an effect if, after the URL even if not perfectly so. This only has an effect if, after the
initial parsing, there is no scheme or netloc available. initial parsing, there is no netloc available.
- default_scheme: specify the scheme to use if the URL either does not - default_scheme: specify the scheme to use if the URL either does not
specify it and we need it (e.g. there is a location), or `absolute` is specify it and we need it (e.g. there is a location), or `absolute` is
true; if absolute is true but `default_scheme` is not specified, use the true; if absolute is true but `default_scheme` is not specified, a netloc
gemini scheme. marker ("//") is prefixed without scheme.
Returns: Returns:
URL parts, as a dictionary with the following keys: "scheme", "netloc", URL parts, as a dictionary with the following keys: "scheme", "netloc",
@ -69,10 +69,12 @@ def parse_url(
for k in ("scheme", "netloc", "path", "query", "fragment") for k in ("scheme", "netloc", "path", "query", "fragment")
} }
# Smol hack: if we assume it's an absolute URL, just prefix scheme and "//". # Smol hack: if we assume it's an absolute URL and no netloc has been found,
if absolute and not parts["scheme"] and not parts["netloc"]: # just prefix default scheme (if any) and "//".
scheme = default_scheme or "gemini" if absolute and not parts["netloc"]:
return parse_url(scheme + "://" + url) scheme = parts["scheme"] or default_scheme
prefix = scheme + "://" if scheme else "//"
return parse_url(prefix + url)
# Another smol hack: if there is no scheme, use `default_scheme` as default. # Another smol hack: if there is no scheme, use `default_scheme` as default.
if default_scheme and parts["scheme"] is None: if default_scheme and parts["scheme"] is None:

View file

@ -33,7 +33,10 @@ class PagePad:
if x <= 0 or y <= 0: if x <= 0 or y <= 0:
return return
content_position = self.current_line, self.current_column content_position = self.current_line, self.current_column
self.pad.refresh(*content_position, 0, 0, x, y) try:
self.pad.refresh(*content_position, 0, 0, x, y)
except curses.error:
pass
def scroll_v(self, num_lines: int, window_height: int =0): def scroll_v(self, num_lines: int, window_height: int =0):
"""Make the content pad scroll up and down by num_lines. """Make the content pad scroll up and down by num_lines.

View file

@ -58,7 +58,7 @@ class Request:
# Connection failed. # Connection failed.
STATE_CONNECTION_FAILED = 7 STATE_CONNECTION_FAILED = 7
def __init__(self, url, cert_stash): def __init__(self, url, cert_stash, identity=None):
self.url = url self.url = url
self.cert_stash = cert_stash self.cert_stash = cert_stash
self.state = Request.STATE_INIT self.state = Request.STATE_INIT
@ -67,6 +67,7 @@ class Request:
self.ssock = None self.ssock = None
self.cert_validation = None self.cert_validation = None
self.error = "" self.error = ""
self.identity = identity
def connect(self, timeout: int) -> bool: def connect(self, timeout: int) -> bool:
"""Connect to a Gemini server and return a RequestEventType. """Connect to a Gemini server and return a RequestEventType.
@ -120,6 +121,7 @@ class Request:
check the whole cert fingerprint. Here it is considered the same as a check the whole cert fingerprint. Here it is considered the same as a
valid certificate. valid certificate.
""" """
# Get hostname and port from the URL.
url_parts = GEMINI_URL_RE.match(self.url) url_parts = GEMINI_URL_RE.match(self.url)
if not url_parts: if not url_parts:
self.state = Request.STATE_INVALID_URL self.state = Request.STATE_INVALID_URL
@ -136,6 +138,7 @@ class Request:
port = 1965 port = 1965
self.hostname = hostname self.hostname = hostname
# Prepare the Gemini request.
try: try:
self.payload = self.url.encode() self.payload = self.url.encode()
except ValueError: except ValueError:
@ -143,6 +146,7 @@ class Request:
return False return False
self.payload += LINE_TERM self.payload += LINE_TERM
# Connect to the server.
try: try:
sock = socket.create_connection((hostname, port), timeout=timeout) sock = socket.create_connection((hostname, port), timeout=timeout)
except OSError as exc: except OSError as exc:
@ -150,7 +154,10 @@ class Request:
self.error = exc.strerror self.error = exc.strerror
return False return False
# Setup TLS.
context = Request.get_ssl_context() context = Request.get_ssl_context()
if self.identity:
context.load_cert_chain(*self.identity)
try: try:
self.ssock = context.wrap_socket(sock, server_hostname=hostname) self.ssock = context.wrap_socket(sock, server_hostname=hostname)
except OSError as exc: except OSError as exc:
@ -159,6 +166,7 @@ class Request:
self.error = exc.strerror self.error = exc.strerror
return False return False
# Validate server certificate.
der = self.ssock.getpeercert(binary_form=True) der = self.ssock.getpeercert(binary_form=True)
self.cert_validation = validate_cert(der, hostname, self.cert_stash) self.cert_validation = validate_cert(der, hostname, self.cert_stash)
cert_status = self.cert_validation["status"] cert_status = self.cert_validation["status"]

View file

@ -0,0 +1,32 @@
import unittest
from ..identity import get_identities_for_url
def get_fake_identity(ident: int):
return {"name": f"test{ident}", "id": f"lol{ident}"}
class TestIdentity(unittest.TestCase):
def test_get_identities_for_url(self):
result = get_identities_for_url({}, "gemini://host/path")
self.assertListEqual(result, [])
identities = {
"gemini://host/path": [get_fake_identity(1)],
"gemini://otherhost/path": [get_fake_identity(2)],
}
result = get_identities_for_url(identities, "gemini://host/path")
self.assertListEqual(result, identities["gemini://host/path"])
result = get_identities_for_url(identities, "gemini://bad/path")
self.assertListEqual(result, [])
identities["gemini://host/path/sub"] = [get_fake_identity(3)]
result = get_identities_for_url(identities, "gemini://host/path/sub")
self.assertListEqual(result, identities["gemini://host/path/sub"])
result = get_identities_for_url(identities, "gemini://host/path/sub/a")
self.assertListEqual(result, identities["gemini://host/path/sub"])
result = get_identities_for_url(identities, "gemini://host/path/sus")
self.assertListEqual(result, identities["gemini://host/path"])

View file

@ -35,7 +35,7 @@ class TestNavigation(unittest.TestCase):
# No scheme nor netloc but we should pretend having an absolute URL. # No scheme nor netloc but we should pretend having an absolute URL.
res = parse_url("dece.space/parse-me.gmi", absolute=True) res = parse_url("dece.space/parse-me.gmi", absolute=True)
self.assertEqual(res["scheme"], "gemini") self.assertIsNone(res["scheme"])
self.assertEqual(res["netloc"], "dece.space") self.assertEqual(res["netloc"], "dece.space")
self.assertEqual(res["path"], "/parse-me.gmi") self.assertEqual(res["path"], "/parse-me.gmi")

View file

@ -5,6 +5,7 @@ requires more clarity both in specification and in our own implementation.
""" """
import hashlib import hashlib
import logging
import re import re
from enum import Enum from enum import Enum
from pathlib import Path from pathlib import Path
@ -57,8 +58,8 @@ confident enough before trusting this new certificate.
### How to ensure this new certificate can be trusted? ### How to ensure this new certificate can be trusted?
Can you join the owner through mail or instant messaging? This is the simplest \ Can you join the owner through mail or instant messaging? This is the simplest \
way for you to make sure that the server is fine, and maybe alert the owner on \ way for you to make sure that the server is fine, and maybe alert the server \
a problem on his server she did not notice. owner that there might be an issue.
""" """
@ -68,7 +69,7 @@ def get_cert_stash_path() -> Path:
def load_cert_stash(stash_path: Path) -> Optional[Dict]: def load_cert_stash(stash_path: Path) -> Optional[Dict]:
"""Load the certificate stash from the file, or None on error. """Return the certificate stash from the file, or None on error.
The stash is a dict with host names as keys and tuples as values. Tuples The stash is a dict with host names as keys and tuples as values. Tuples
have four elements: have four elements:
@ -105,7 +106,7 @@ def save_cert_stash(stash: dict, stash_path: Path):
entry_line = f"{name} {algo} {fingerprint}\n" entry_line = f"{name} {algo} {fingerprint}\n"
stash_file.write(entry_line) stash_file.write(entry_line)
except (OSError, ValueError) as exc: except (OSError, ValueError) as exc:
print(f"Failed to save certificate stash '{stash_path}': {exc}") logging.error(f"Failed to save certificate stash '{stash_path}': {exc}")
class CertStatus(Enum): class CertStatus(Enum):