Compare commits

..

6 commits

Author SHA1 Message Date
dece 62997b9385 help: display current config 2021-05-13 01:26:50 +02:00
dece f63d875fc3 logging 2021-05-13 01:25:50 +02:00
dece 8b1561e689 identity: present cert instead of waiting for 60 2021-05-13 01:24:29 +02:00
dece 57f01720d6 identity: add basic identity management 2021-05-12 22:34:18 +02:00
dece 6ceb75b84c config: add download_path 2021-05-09 23:02:56 +02:00
dece 056616b130 page_pad: fix crashes on some fail refresh
would have been better to understand why it crashed but yolo
2021-05-09 15:55:27 +02:00
16 changed files with 491 additions and 82 deletions

View file

@ -17,12 +17,22 @@ TODO DONE
view history
open last download
media files
home page
identity management
--------------------------------------------------------------------------------
logging
home page
BACKLOG
--------------------------------------------------------------------------------
bug: can't input unicode
click on links to open them
download to disk, not in memory
bug: can't reload bebop: pages
bug: exiting editor breaks curses
dumb rendering mode per site
disable cache per site
well, preferences per site maybe?
download without memory buffer
download in the background
download view instead of last download
does encoding really work? cf. egsam
@ -35,5 +45,7 @@ bug: combining chars reduce lengths
non shit command-line
response code 11 (if still there)
gopher?
save history
opt. maintain history between sessions
history (forward) (useful?)
search in page (ugh)
get/set config using command-line

View file

@ -77,6 +77,7 @@ Here are the available options:
|----------------------------|--------------|----------------|---------------------------------------|
| `connect_timeout` | int | 10 | Seconds before connection times out. |
| `text_width` | int | 80 | Rendered line length. |
| `download_path` | string | | Download path. |
| `source_editor` | string list | `["vi"]` | Command to use for editing sources. |
| `command_editor` | string list | `["vi"]` | Command to use for editing CLI input. |
| `history_limit` | int | 1000 | Maximum entries in history. |

View file

@ -1,4 +1,5 @@
import argparse
import logging
from bebop.browser.browser import Browser
from bebop.config import load_config
@ -9,8 +10,18 @@ from bebop.tofu import get_cert_stash_path, load_cert_stash, save_cert_stash
def main():
argparser = argparse.ArgumentParser()
argparser.add_argument("url", nargs="?", default=None)
argparser.add_argument("-d", "--debug", action="store_true")
args = argparser.parse_args()
if args.debug:
logging.basicConfig(
filename="bebop.log",
level=logging.DEBUG,
format="%(asctime)s %(levelname)-8s %(message)s"
)
else:
logging.disable()
if args.url:
start_url = args.url
else:
@ -19,7 +30,10 @@ def main():
config_path = get_config_path()
config = load_config(config_path)
ensure_bebop_files_exist()
bebop_files_error = ensure_bebop_files_exist()
if bebop_files_error:
logging.critical(f"Failed to create files: {bebop_files_error}")
return
cert_stash_path = get_cert_stash_path()
cert_stash = load_cert_stash(cert_stash_path) or {}

View file

@ -3,12 +3,13 @@
import curses
import curses.ascii
import curses.textpad
import logging
import os
import subprocess
import tempfile
from math import inf
from pathlib import Path
from typing import Optional, Tuple
from typing import Dict, Optional, Tuple
from bebop.bookmarks import (
get_bookmarks_path, get_bookmarks_document, save_bookmark
@ -16,8 +17,10 @@ from bebop.bookmarks import (
from bebop.colors import ColorPair, init_colors
from bebop.command_line import CommandLine
from bebop.external import open_external_program
from bebop.help import HELP_PAGE
from bebop.fs import get_identities_list_path
from bebop.help import get_help
from bebop.history import History
from bebop.identity import load_identities
from bebop.links import Links
from bebop.mime import MimeType
from bebop.mouse import ButtonState
@ -43,12 +46,13 @@ class Browser:
- status_data: 3-uple of status text, color pair and attributes of the
status line, used to reset status after an error.
- history: an History object.
- cache: a dict containing cached pages
- cache: a dict containing cached pages.
- special_pages: a dict containing page names used with "bebop" scheme;
values are dicts as well: the "open" key maps to a callable to use when
the page is accessed, and the optional "source" key maps to callable
returning the page source path.
- last_download: tuple of MimeType and path, or None.
- identities: identities map.
"""
def __init__(self, config, cert_stash):
@ -65,6 +69,7 @@ class Browser:
self.cache = {}
self.special_pages = self.setup_special_pages()
self.last_download: Optional[Tuple[MimeType, Path]] = None
self.identities = load_identities(get_identities_list_path()) or {}
self._current_url = ""
@property
@ -86,6 +91,11 @@ class Browser:
self._current_url = url
self.set_status(url)
@property
def current_scheme(self):
"""Return the scheme of the current URL."""
return parse_url(self._current_url)["scheme"] or ""
def setup_special_pages(self):
"""Return a dict with the special pages functions."""
return {
@ -104,6 +114,7 @@ class Browser:
def run(self, *args, **kwargs):
"""Use curses' wrapper around _run."""
os.environ.setdefault("ESCDELAY", "25")
logging.info("Cursing…")
curses.wrapper(self._run, *args, **kwargs)
def _run(self, stdscr, start_url=None):
@ -250,6 +261,7 @@ class Browser:
def refresh_status_line(self):
"""Refresh status line contents."""
text, pair, attributes = self.status_data
logging.debug("Status: " + text)
text = text[:self.w - 1]
color = curses.color_pair(pair)
self.status_line.addstr(0, 0, text, color | attributes)
@ -295,6 +307,15 @@ class Browser:
from bebop.browser.gemini import forget_certificate
forget_certificate(self, words[1])
def get_user_text_input(self, status_text, char, prefix="", strip=False):
"""Get user input from the command-line."""
self.set_status(status_text)
result = self.command_line.focus(char, prefix=prefix)
self.reset_status()
if strip:
result = result.strip()
return result
def open_url(self, url, base_url=None, redirects=0, assume_absolute=False,
history=True, use_cache=True):
"""Try to open an URL.
@ -318,12 +339,14 @@ class Browser:
self.set_status_error(f"Too many redirections ({url}).")
return
current_scheme = self.current_scheme or "gemini"
if assume_absolute or not self.current_url:
parts = parse_url(url, absolute=True, default_scheme="gemini")
parts = parse_url(url, absolute=True, default_scheme=current_scheme)
else:
parts = parse_url(url)
parts = parse_url(url, default_scheme=current_scheme)
if parts["scheme"] is None and parts["netloc"] is None:
# If there is a no netloc part, try to join the URL.
if parts["netloc"] is None and parts["scheme"] == current_scheme:
base_url = base_url or self.current_url
if base_url:
parts = parse_url(join_url(base_url, url))
@ -331,10 +354,10 @@ class Browser:
self.set_status_error(f"Can't open '{url}'.")
return
# Replace URL passed as parameter by a proper absolute one.
# Replace URL passed as parameter by a sanitized one.
url = unparse_url(parts)
scheme = parts["scheme"] or ""
scheme = parts["scheme"]
if scheme == "gemini":
from bebop.browser.gemini import open_gemini_url
success = open_gemini_url(
@ -523,17 +546,15 @@ class Browser:
"""Add the current URL as bookmark."""
if not self.current_url:
return
self.set_status("Bookmark title?")
current_title = self.page_pad.current_page.title or ""
title = self.command_line.focus(
title = self.get_user_text_input(
"Bookmark title?",
CommandLine.CHAR_TEXT,
prefix=current_title
prefix=current_title,
strip=True,
)
if title:
title = title.strip()
if title:
save_bookmark(self.current_url, title)
self.reset_status()
def edit_page(self):
"""Open a text editor to edit the page source.
@ -572,7 +593,7 @@ class Browser:
def open_help(self):
"""Show the help page."""
self.open_internal_page("help", HELP_PAGE)
self.open_internal_page("help", get_help(self.config))
def prompt(self, text, keys):
"""Display the text and allow it to type one of the given keys."""

View file

@ -1,10 +1,18 @@
"""Gemini-related features of the browser."""
import logging
from pathlib import Path
from typing import Optional
from bebop.browser.browser import Browser
from bebop.command_line import CommandLine
from bebop.fs import get_downloads_path
from bebop.fs import (
get_downloads_path, get_identities_path, get_identities_list_path
)
from bebop.identity import (
ClientCertificateException, create_certificate, get_cert_and_key,
get_identities_for_url, load_identities, save_identities
)
from bebop.navigation import set_parameter
from bebop.page import Page
from bebop.protocol import Request, Response
@ -14,7 +22,13 @@ from bebop.tofu import trust_fingerprint, untrust_fingerprint, WRONG_FP_ALERT
MAX_URL_LEN = 1024
def open_gemini_url(browser: Browser, url, redirects=0, use_cache=True):
def open_gemini_url(
browser: Browser,
url: str,
redirects: int =0,
use_cache: bool =True,
cert_and_key=None
) -> Optional[str]:
"""Open a Gemini URL and set the formatted response as content.
While the specification is not set in stone, every client takes a slightly
@ -27,7 +41,7 @@ def open_gemini_url(browser: Browser, url, redirects=0, use_cache=True):
- STATE_INVALID_CERT: the certificate has non-fatal issues; we may
present the user the problems found and let her decide whether to trust
temporarily the certificate or not BUT we currently do not parse the
certificate's fields, so this state is never used.
certificate's fields, not even the pubkey, so this state is never used.
- STATE_UNKNOWN_CERT: the certificate is valid but has not been seen before;
as we're doing TOFU here, we could automatically trust it or let the user
choose. For simplicity, we always trust it permanently.
@ -37,23 +51,38 @@ def open_gemini_url(browser: Browser, url, redirects=0, use_cache=True):
- url: a valid URL with Gemini scheme to open.
- redirects: current amount of redirections done to open the initial URL.
- use_cache: if true, look up if the page is cached before requesting it.
- cert_and_key: if not None, a tuple of paths to a client cert/key to use.
Returns:
True on success, False otherwise.
The final successfully handled URL on success, None otherwise. Redirected
URLs are not returned.
"""
if len(url) >= MAX_URL_LEN:
browser.set_status_error("Request URL too long.")
return
return None
browser.set_status(f"Loading {url}")
loading_message_verb = "Loading" if redirects == 0 else "Redirecting to"
loading_message = f"{loading_message_verb} {url}"
browser.set_status(loading_message)
# If this URL used to request an identity, provide it.
if not cert_and_key:
url_identities = get_identities_for_url(browser.identities, url)
identity = select_identity(url_identities)
if identity:
cert_and_key = get_cert_and_key(identity["id"])
if use_cache and url in browser.cache:
browser.load_page(browser.cache[url])
browser.current_url = url
browser.set_status(url)
return True
return url
req = Request(url, browser.stash)
logging.info(
f"Request {url}"
+ (f" using cert and key {cert_and_key}" if cert_and_key else "")
)
req = Request(url, browser.stash, identity=cert_and_key)
connect_timeout = browser.config["connect_timeout"]
connected = req.connect(connect_timeout)
if not connected:
@ -68,7 +97,7 @@ def open_gemini_url(browser: Browser, url, redirects=0, use_cache=True):
else:
error = f"Connection failed ({url})."
browser.set_status_error(error)
return False
return None
if req.state == Request.STATE_INVALID_CERT:
pass
@ -87,11 +116,11 @@ def open_gemini_url(browser: Browser, url, redirects=0, use_cache=True):
data = req.proceed()
if not data:
browser.set_status_error(f"Server did not respond in time ({url}).")
return False
return None
response = Response.parse(data)
if not response:
browser.set_status_error(f"Server response parsing failed ({url}).")
return False
return None
return _handle_response(browser, response, url, redirects)
@ -116,26 +145,43 @@ def _handle_untrusted_cert(browser: Browser, request: Request):
browser.load_page(alert_page)
def _handle_response(browser: Browser, response: Response, url: str,
redirects: int):
def _handle_response(
browser: Browser,
response: Response,
url: str,
redirects: int
) -> Optional[str]:
"""Handle a response from a Gemini server.
Returns:
True on success, False otherwise.
The final URL on success, None otherwise.
"""
logging.info(f"Response {response.code} {response.meta}")
if response.code == 20:
return _handle_successful_response(browser, response, url)
elif response.generic_code == 30 and response.meta:
browser.open_url(response.meta, base_url=url, redirects=redirects + 1)
# On redirections, we go back to open_url as the redirection may be to
# another protocol. Discard the result of this request.
browser.open_url(
response.meta,
base_url=url,
redirects=redirects + 1
)
elif response.generic_code in (40, 50):
error = f"Server error: {response.meta or Response.code.name}"
browser.set_status_error(error)
elif response.generic_code == 10:
_handle_input_request(browser, url, response.meta)
return _handle_input_request(browser, url, response.meta)
elif response.code == 60:
return _handle_cert_required(browser, response, url, redirects)
elif response.code in (61, 62):
details = response.meta or Response.code.name
error = f"Client certificate error: {details}"
browser.set_status_error(error)
else:
error = f"Unhandled response code {response.code}"
browser.set_status_error(error)
return False
return None
def _handle_successful_response(browser: Browser, response: Response, url: str):
@ -155,7 +201,7 @@ def _handle_successful_response(browser: Browser, response: Response, url: str):
- response: a successful Response.
Returns:
True on success, False otherwise.
The successfully handled URL on success, None otherwise.
"""
# Use appropriate response parser according to the MIME type.
mime_type = response.get_mime_type()
@ -175,7 +221,8 @@ def _handle_successful_response(browser: Browser, response: Response, url: str):
text = response.content.decode("utf-8", errors="replace")
page = Page.from_text(text)
else:
filepath = _get_download_path(url)
download_dir = browser.config["download_path"]
filepath = _get_download_path(url, download_dir=download_dir)
# If a page has been produced, load it. Else if a file has been retrieved,
# download it.
@ -184,7 +231,7 @@ def _handle_successful_response(browser: Browser, response: Response, url: str):
browser.current_url = url
browser.cache[url] = page
browser.set_status(url)
return True
return url
elif filepath:
try:
with open(filepath, "wb") as download_file:
@ -194,26 +241,36 @@ def _handle_successful_response(browser: Browser, response: Response, url: str):
else:
browser.set_status(f"Downloaded {url} ({mime_type.short}).")
browser.last_download = mime_type, filepath
return True
return url
elif error:
browser.set_status_error(error)
return False
return None
def _get_download_path(url: str) -> Path:
def _get_download_path(url: str, download_dir: Optional[str] =None) -> Path:
"""Try to find the best download file path possible from this URL."""
download_dir = get_downloads_path()
download_path = Path(download_dir) if download_dir else get_downloads_path()
if not download_path.exists():
download_path.mkdir(parents=True)
url_parts = url.rsplit("/", maxsplit=1)
if url_parts:
filename = url_parts[-1]
else:
filename = url.split("://")[1] if "://" in url else url
filename = filename.replace("/", "_")
return download_dir / filename
return download_path / filename
def _handle_input_request(browser: Browser, from_url: str, message: str =None):
"""Focus command-line to pass input to the server."""
def _handle_input_request(
browser: Browser,
from_url: str,
message: str =None
) -> Optional[str]:
"""Focus command-line to pass input to the server.
Returns:
The result of `open_gemini_url` with the new request including user input.
"""
if message:
browser.set_status(f"Input needed: {message}")
else:
@ -222,12 +279,86 @@ def _handle_input_request(browser: Browser, from_url: str, message: str =None):
if not user_input:
return
url = set_parameter(from_url, user_input)
open_gemini_url(browser, url)
return open_gemini_url(browser, url)
def _handle_cert_required(
browser: Browser,
response: Response,
url: str,
redirects: int
) -> Optional[str]:
"""Find a matching identity and resend the request with it.
Returns:
The result of `open_gemini_url` with the client certificate provided.
"""
identities = load_identities(get_identities_list_path())
if not identities:
browser.set_status_error(f"Can't load identities.")
return None
browser.identities = identities
url_identities = get_identities_for_url(browser.identities, url)
if not url_identities:
identity = create_identity(browser, url)
if not identity:
return None
browser.identities[url] = [identity]
save_identities(browser.identities, get_identities_list_path())
else:
identity = select_identity(url_identities)
cert_path, key_path = get_cert_and_key(identity["id"])
return open_gemini_url(
browser,
url,
redirects=redirects + 1,
use_cache=False,
cert_and_key=(cert_path, key_path)
)
def select_identity(identities: list):
"""Let user select the appropriate identity among candidates."""
# TODO support multiple identities; for now we just use the first available.
return identities[0] if identities else None
def create_identity(browser: Browser, url: str):
"""Walk the user through identity creation.
Returns:
The created identity on success (already registered in identities
"""
key = browser.prompt("Create client certificate? [y/n]", "yn")
if key != "y":
browser.reset_status()
return None
common_name = browser.get_user_text_input(
"Name? The server will see this, you can leave it empty.",
CommandLine.CHAR_TEXT,
strip=True,
)
if not common_name:
browser.reset_status()
return None
browser.set_status("Generating certificate…")
try:
mangled_name = create_certificate(url, common_name)
except ClientCertificateException as exc:
browser.set_status_error(exc.message)
return None
browser.reset_status()
return {"name": common_name, "id": mangled_name}
def forget_certificate(browser: Browser, hostname: str):
"""Remove the fingerprint associated to this hostname for the cert stash."""
key = browser.prompt(f"Remove fingerprint from {hostname}? [y/N]", "ynN")
key = browser.prompt(f"Remove fingerprint for {hostname}? [y/n]", "yn")
if key != "y":
browser.reset_status()
return

View file

@ -5,6 +5,7 @@ import curses.ascii
import curses.textpad
import os
import tempfile
from typing import Optional
from bebop.external import open_external_program
from bebop.links import Links
@ -38,7 +39,7 @@ class CommandLine:
self.window.clear()
self.window.refresh()
def gather(self):
def gather(self) -> str:
"""Return the string currently written by the user in command line.
This doesn't count the command char used, but it includes then prefix.
@ -46,7 +47,13 @@ class CommandLine:
"""
return self.textbox.gather()[1:].rstrip()
def focus(self, command_char, validator=None, prefix=""):
def focus(
self,
command_char,
validator=None,
prefix="",
escape_to_none=False
) -> Optional[str]:
"""Give user focus to the command bar.
Show the command char and give focus to the command textbox. The
@ -58,10 +65,12 @@ class CommandLine:
- validator: function to use to validate the input chars; if omitted,
`validate_common_input` is used.
- prefix: string to insert before the cursor in the command line.
- escape_to_none: if True, an escape interruption returns None instead
of an empty string.
Returns:
User input as string. The string will be empty if the validator raised
an EscapeInterrupt.
an EscapeInterrupt, unless `escape_to_none` is True.
"""
validator = validator or self._validate_common_input
self.window.clear()
@ -71,7 +80,7 @@ class CommandLine:
try:
command = self.textbox.edit(validator)
except EscapeCommandInterrupt:
command = ""
command = "" if not escape_to_none else None
except TerminateCommandInterrupt as exc:
command = exc.command
else:

View file

@ -1,12 +1,14 @@
"""Config management."""
import json
import logging
import os.path
DEFAULT_CONFIG = {
"connect_timeout": 10,
"text_width": 80,
"download_path": "",
"source_editor": ["vi"],
"command_editor": ["vi"],
"history_limit": 1000,
@ -24,9 +26,9 @@ def load_config(config_path):
with open(config_path, "rt") as config_file:
config = json.load(config_file)
except OSError as exc:
print(f"Could not read config file {config_path}: {exc}")
logging.error(f"Could not read config file {config_path}: {exc}")
except ValueError as exc:
print(f"Could not parse config file {config_path}: {exc}")
logging.error(f"Could not parse config file {config_path}: {exc}")
else:
# Fill missing values with defaults.
for key, value in DEFAULT_CONFIG.items():
@ -41,4 +43,4 @@ def create_default_config(config_path):
with open(config_path, "wt") as config_file:
json.dump(DEFAULT_CONFIG, config_file, indent=2)
except OSError as exc:
print(f"Could not create config file {config_path}: {exc}")
logging.error(f"Could not create config file {config_path}: {exc}")

View file

@ -8,6 +8,7 @@ from functools import lru_cache
from os import getenv
from os.path import expanduser
from pathlib import Path
from typing import Optional
APP_NAME = "bebop"
@ -29,7 +30,7 @@ def get_user_data_path() -> Path:
@lru_cache(None)
def get_downloads_path() -> Path:
"""Return the user downloads directory path."""
"""Return the user downloads directory path (fallbacks to home dir)."""
xdg_config_path = Path(getenv("XDG_CONFIG_HOME", expanduser("~/.config")))
download_path = ""
try:
@ -47,9 +48,36 @@ def get_downloads_path() -> Path:
return Path.home()
def ensure_bebop_files_exist():
"""Ensure various Bebop's files or directories are present."""
@lru_cache(None)
def get_identities_list_path():
"""Return the identities JSON file path."""
return get_user_data_path() / "identities.json"
@lru_cache(None)
def get_identities_path():
"""Return the directory where identities are stored."""
return get_user_data_path() / "identities"
def ensure_bebop_files_exist() -> Optional[str]:
"""Ensure various Bebop's files or directories are present.
Returns:
None if all files and directories are present, an error string otherwise.
"""
try:
# Ensure the user data directory exists.
user_data_path = get_user_data_path()
if not user_data_path.exists():
user_data_path.mkdir(parents=True)
# Ensure the identities file and directory exists.
identities_file_path = get_identities_list_path()
if not identities_file_path.exists():
with open(identities_file_path, "wt") as identities_file:
identities_file.write("{}")
identities_path = get_identities_path()
if not identities_path.exists():
identities_path.mkdir(parents=True)
except OSError as exc:
return str(exc)

View file

@ -46,4 +46,18 @@ with arguments by pressing the corresponding keybind above.
* o/open <url>: open this URL
* forget_certificate <hostname>: remove saved fingerprint for the hostname
* q/quit: well, quit
## Configuration
The current configuration is:
{config_list}
"""
def get_help(config):
config_list = "\n".join(
f"* {key} = {value}"
for key, value in config.items()
)
return HELP_PAGE.format(config_list=config_list)

131
bebop/identity.py Normal file
View file

@ -0,0 +1,131 @@
"""Identity management, i.e. client certificates.
Identities are created when a server requests them for the first time, and saved
with the corresponding URL. The certificate is automatically presented when the
URL is revisited, and all "children" URLs.
Identities are stored on disk as pairs of certificates/keys. URLs are stored in
an identity file, `identities.json`, a simple URL dict that can be looked up for
identities to use, mapped to an ID to identify the cert/key files.
The identity file and the identities dict both have the following format:
``` json
{
"gemini://example.com/app": [
{
"name": "test",
"id": "geminiexamplecomapp-test",
}
]
}
```
"""
import hashlib
import json
import logging
import secrets
import string
import subprocess
from pathlib import Path
from typing import Optional, Union
from bebop.fs import get_identities_path, get_user_data_path
def load_identities(identities_path: Path) -> Optional[dict]:
"""Return saved identities or None on error."""
identities = {}
try:
with open(identities_path, "rt") as identities_file:
identities = json.load(identities_file)
except (OSError, ValueError) as exc:
logging.error(f"Failed to load identities '{identities_path}': {exc}")
return None
return identities
def save_identities(identities: dict, identities_path: Path):
"""Save the certificate stash. Return True on success."""
try:
with open(identities_path, "wt") as identities_file:
json.dump(identities, identities_file)
except (OSError, ValueError) as exc:
logging.error(f"Failed to save identities '{identities_path}': {exc}")
return False
return True
class ClientCertificateException(Exception):
def __init__(self, message: str) -> None:
super().__init__()
self.message = message
def get_identities_for_url(identities: dict, url: str) -> list:
"""For a given URL, return all its identities.
If several URLs are prefixes of the given URL, e.g. we look up
"gemini://host/app/sub" and there are identities for both
"gemini://host/app" and "gemini://host/app/sub", the longest URL's
identities are returned (here the latter).
"""
candidates = [key for key in identities if url.startswith(key)]
if not candidates:
return []
return identities[max(candidates, key=len)]
def get_cert_and_key(cert_id: str):
"""Return the paths of the certificate and key file for this ID."""
directory = get_identities_path()
return directory / f"{cert_id}.crt", directory / f"{cert_id}.key"
def create_certificate(url: str, common_name: str):
"""Create a secure self-signed certificate using system's OpenSSL."""
identities_path = get_identities_path()
mangled_name = get_mangled_name(url, common_name)
cert_path = identities_path / f"{mangled_name}.crt"
key_path = identities_path / f"{mangled_name}.key"
command = [
"openssl", "req",
"-newkey", "rsa:4096",
"-nodes",
"-keyform", "PEM",
"-keyout", str(key_path),
"-x509",
"-days", "28140", # https://www.youtube.com/watch?v=F9L4q-0Pi4E
"-outform", "PEM",
"-out", str(cert_path),
"-subj", f"/CN={common_name}",
]
try:
subprocess.check_call(
command,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
except subprocess.CalledProcessError as exc:
error = "Could not create certificate: " + str(exc)
raise ClientCertificateException(error)
return mangled_name
def get_mangled_name(url: str, common_name: str) -> str:
"""Return a mangled name for the certificate and key files.
This is not obfuscation at all. The mangling is extremely simple and is
just a way to produce names easier on the file system than full URLs.
The mangling is:
`sha256(md5(url) + "-" + common_name + "-" + 8_random_hex_digits)`
with characters that can't be UTF-8 encoded replaced by U+FFFD REPLACEMENT
CHARACTER.
"""
encoded_url = hashlib.md5(url.encode(errors="replace")).hexdigest()
random_hex = hex(secrets.randbits(32))[2:].zfill(8)
name = f"{encoded_url}-{common_name}-{random_hex}"
return hashlib.sha256(name.encode(errors="replace")).hexdigest()

View file

@ -45,11 +45,11 @@ def parse_url(
- absolute: assume the URL is absolute, e.g. in the case we are trying to
parse an URL an user has written, which is most of the time an absolute
URL even if not perfectly so. This only has an effect if, after the
initial parsing, there is no scheme or netloc available.
initial parsing, there is no netloc available.
- default_scheme: specify the scheme to use if the URL either does not
specify it and we need it (e.g. there is a location), or `absolute` is
true; if absolute is true but `default_scheme` is not specified, use the
gemini scheme.
true; if absolute is true but `default_scheme` is not specified, a netloc
marker ("//") is prefixed without scheme.
Returns:
URL parts, as a dictionary with the following keys: "scheme", "netloc",
@ -69,10 +69,12 @@ def parse_url(
for k in ("scheme", "netloc", "path", "query", "fragment")
}
# Smol hack: if we assume it's an absolute URL, just prefix scheme and "//".
if absolute and not parts["scheme"] and not parts["netloc"]:
scheme = default_scheme or "gemini"
return parse_url(scheme + "://" + url)
# Smol hack: if we assume it's an absolute URL and no netloc has been found,
# just prefix default scheme (if any) and "//".
if absolute and not parts["netloc"]:
scheme = parts["scheme"] or default_scheme
prefix = scheme + "://" if scheme else "//"
return parse_url(prefix + url)
# Another smol hack: if there is no scheme, use `default_scheme` as default.
if default_scheme and parts["scheme"] is None:

View file

@ -33,7 +33,10 @@ class PagePad:
if x <= 0 or y <= 0:
return
content_position = self.current_line, self.current_column
try:
self.pad.refresh(*content_position, 0, 0, x, y)
except curses.error:
pass
def scroll_v(self, num_lines: int, window_height: int =0):
"""Make the content pad scroll up and down by num_lines.

View file

@ -58,7 +58,7 @@ class Request:
# Connection failed.
STATE_CONNECTION_FAILED = 7
def __init__(self, url, cert_stash):
def __init__(self, url, cert_stash, identity=None):
self.url = url
self.cert_stash = cert_stash
self.state = Request.STATE_INIT
@ -67,6 +67,7 @@ class Request:
self.ssock = None
self.cert_validation = None
self.error = ""
self.identity = identity
def connect(self, timeout: int) -> bool:
"""Connect to a Gemini server and return a RequestEventType.
@ -120,6 +121,7 @@ class Request:
check the whole cert fingerprint. Here it is considered the same as a
valid certificate.
"""
# Get hostname and port from the URL.
url_parts = GEMINI_URL_RE.match(self.url)
if not url_parts:
self.state = Request.STATE_INVALID_URL
@ -136,6 +138,7 @@ class Request:
port = 1965
self.hostname = hostname
# Prepare the Gemini request.
try:
self.payload = self.url.encode()
except ValueError:
@ -143,6 +146,7 @@ class Request:
return False
self.payload += LINE_TERM
# Connect to the server.
try:
sock = socket.create_connection((hostname, port), timeout=timeout)
except OSError as exc:
@ -150,7 +154,10 @@ class Request:
self.error = exc.strerror
return False
# Setup TLS.
context = Request.get_ssl_context()
if self.identity:
context.load_cert_chain(*self.identity)
try:
self.ssock = context.wrap_socket(sock, server_hostname=hostname)
except OSError as exc:
@ -159,6 +166,7 @@ class Request:
self.error = exc.strerror
return False
# Validate server certificate.
der = self.ssock.getpeercert(binary_form=True)
self.cert_validation = validate_cert(der, hostname, self.cert_stash)
cert_status = self.cert_validation["status"]

View file

@ -0,0 +1,32 @@
import unittest
from ..identity import get_identities_for_url
def get_fake_identity(ident: int):
return {"name": f"test{ident}", "id": f"lol{ident}"}
class TestIdentity(unittest.TestCase):
def test_get_identities_for_url(self):
result = get_identities_for_url({}, "gemini://host/path")
self.assertListEqual(result, [])
identities = {
"gemini://host/path": [get_fake_identity(1)],
"gemini://otherhost/path": [get_fake_identity(2)],
}
result = get_identities_for_url(identities, "gemini://host/path")
self.assertListEqual(result, identities["gemini://host/path"])
result = get_identities_for_url(identities, "gemini://bad/path")
self.assertListEqual(result, [])
identities["gemini://host/path/sub"] = [get_fake_identity(3)]
result = get_identities_for_url(identities, "gemini://host/path/sub")
self.assertListEqual(result, identities["gemini://host/path/sub"])
result = get_identities_for_url(identities, "gemini://host/path/sub/a")
self.assertListEqual(result, identities["gemini://host/path/sub"])
result = get_identities_for_url(identities, "gemini://host/path/sus")
self.assertListEqual(result, identities["gemini://host/path"])

View file

@ -35,7 +35,7 @@ class TestNavigation(unittest.TestCase):
# No scheme nor netloc but we should pretend having an absolute URL.
res = parse_url("dece.space/parse-me.gmi", absolute=True)
self.assertEqual(res["scheme"], "gemini")
self.assertIsNone(res["scheme"])
self.assertEqual(res["netloc"], "dece.space")
self.assertEqual(res["path"], "/parse-me.gmi")

View file

@ -5,6 +5,7 @@ requires more clarity both in specification and in our own implementation.
"""
import hashlib
import logging
import re
from enum import Enum
from pathlib import Path
@ -57,8 +58,8 @@ confident enough before trusting this new certificate.
### How to ensure this new certificate can be trusted?
Can you join the owner through mail or instant messaging? This is the simplest \
way for you to make sure that the server is fine, and maybe alert the owner on \
a problem on his server she did not notice.
way for you to make sure that the server is fine, and maybe alert the server \
owner that there might be an issue.
"""
@ -68,7 +69,7 @@ def get_cert_stash_path() -> Path:
def load_cert_stash(stash_path: Path) -> Optional[Dict]:
"""Load the certificate stash from the file, or None on error.
"""Return the certificate stash from the file, or None on error.
The stash is a dict with host names as keys and tuples as values. Tuples
have four elements:
@ -105,7 +106,7 @@ def save_cert_stash(stash: dict, stash_path: Path):
entry_line = f"{name} {algo} {fingerprint}\n"
stash_file.write(entry_line)
except (OSError, ValueError) as exc:
print(f"Failed to save certificate stash '{stash_path}': {exc}")
logging.error(f"Failed to save certificate stash '{stash_path}': {exc}")
class CertStatus(Enum):