identity: add basic identity management

This commit is contained in:
dece 2021-05-12 22:29:03 +02:00
parent 6ceb75b84c
commit 57f01720d6
12 changed files with 399 additions and 61 deletions

View file

@ -17,8 +17,9 @@ TODO DONE
view history view history
open last download open last download
media files media files
identity management
home page home page
identity management logging
-------------------------------------------------------------------------------- --------------------------------------------------------------------------------
BACKLOG BACKLOG
click on links to open them click on links to open them
@ -37,3 +38,6 @@ response code 11 (if still there)
gopher? gopher?
save history save history
history (forward) (useful?) history (forward) (useful?)
bug: can't reload bebop: pages
bug: can't input unicode
bug: astrobotany loops on /app

View file

@ -19,7 +19,10 @@ def main():
config_path = get_config_path() config_path = get_config_path()
config = load_config(config_path) config = load_config(config_path)
ensure_bebop_files_exist() bebop_files_error = ensure_bebop_files_exist()
if bebop_files_error:
print("Bebop could not create local files:", bebop_files_error)
return
cert_stash_path = get_cert_stash_path() cert_stash_path = get_cert_stash_path()
cert_stash = load_cert_stash(cert_stash_path) or {} cert_stash = load_cert_stash(cert_stash_path) or {}

View file

@ -86,6 +86,11 @@ class Browser:
self._current_url = url self._current_url = url
self.set_status(url) self.set_status(url)
@property
def current_scheme(self):
"""Return the scheme of the current URL."""
return parse_url(self._current_url)["scheme"] or ""
def setup_special_pages(self): def setup_special_pages(self):
"""Return a dict with the special pages functions.""" """Return a dict with the special pages functions."""
return { return {
@ -295,6 +300,15 @@ class Browser:
from bebop.browser.gemini import forget_certificate from bebop.browser.gemini import forget_certificate
forget_certificate(self, words[1]) forget_certificate(self, words[1])
def get_user_text_input(self, status_text, char, prefix="", strip=False):
"""Get user input from the command-line."""
self.set_status(status_text)
result = self.command_line.focus(char, prefix=prefix)
self.reset_status()
if strip:
result = result.strip()
return result
def open_url(self, url, base_url=None, redirects=0, assume_absolute=False, def open_url(self, url, base_url=None, redirects=0, assume_absolute=False,
history=True, use_cache=True): history=True, use_cache=True):
"""Try to open an URL. """Try to open an URL.
@ -318,12 +332,14 @@ class Browser:
self.set_status_error(f"Too many redirections ({url}).") self.set_status_error(f"Too many redirections ({url}).")
return return
current_scheme = self.current_scheme or "gemini"
if assume_absolute or not self.current_url: if assume_absolute or not self.current_url:
parts = parse_url(url, absolute=True, default_scheme="gemini") parts = parse_url(url, absolute=True, default_scheme=current_scheme)
else: else:
parts = parse_url(url) parts = parse_url(url, default_scheme=current_scheme)
if parts["scheme"] is None and parts["netloc"] is None: # If there is a no netloc part, try to join the URL.
if parts["netloc"] is None and parts["scheme"] == current_scheme:
base_url = base_url or self.current_url base_url = base_url or self.current_url
if base_url: if base_url:
parts = parse_url(join_url(base_url, url)) parts = parse_url(join_url(base_url, url))
@ -331,10 +347,10 @@ class Browser:
self.set_status_error(f"Can't open '{url}'.") self.set_status_error(f"Can't open '{url}'.")
return return
# Replace URL passed as parameter by a proper absolute one. # Replace URL passed as parameter by a sanitized one.
url = unparse_url(parts) url = unparse_url(parts)
scheme = parts["scheme"] or "" scheme = parts["scheme"]
if scheme == "gemini": if scheme == "gemini":
from bebop.browser.gemini import open_gemini_url from bebop.browser.gemini import open_gemini_url
success = open_gemini_url( success = open_gemini_url(
@ -523,17 +539,15 @@ class Browser:
"""Add the current URL as bookmark.""" """Add the current URL as bookmark."""
if not self.current_url: if not self.current_url:
return return
self.set_status("Bookmark title?")
current_title = self.page_pad.current_page.title or "" current_title = self.page_pad.current_page.title or ""
title = self.command_line.focus( title = self.get_user_text_input(
"Bookmark title?",
CommandLine.CHAR_TEXT, CommandLine.CHAR_TEXT,
prefix=current_title prefix=current_title,
strip=True,
) )
if title: if title:
title = title.strip() save_bookmark(self.current_url, title)
if title:
save_bookmark(self.current_url, title)
self.reset_status()
def edit_page(self): def edit_page(self):
"""Open a text editor to edit the page source. """Open a text editor to edit the page source.

View file

@ -1,10 +1,17 @@
"""Gemini-related features of the browser.""" """Gemini-related features of the browser."""
from pathlib import Path from pathlib import Path
from typing import Optional
from bebop.browser.browser import Browser from bebop.browser.browser import Browser
from bebop.command_line import CommandLine from bebop.command_line import CommandLine
from bebop.fs import get_downloads_path from bebop.fs import (
get_downloads_path, get_identities_path, get_identities_list_path
)
from bebop.identity import (
ClientCertificateException, create_certificate, get_cert_and_key,
get_identities_for_url, load_identities, save_identities
)
from bebop.navigation import set_parameter from bebop.navigation import set_parameter
from bebop.page import Page from bebop.page import Page
from bebop.protocol import Request, Response from bebop.protocol import Request, Response
@ -14,7 +21,13 @@ from bebop.tofu import trust_fingerprint, untrust_fingerprint, WRONG_FP_ALERT
MAX_URL_LEN = 1024 MAX_URL_LEN = 1024
def open_gemini_url(browser: Browser, url, redirects=0, use_cache=True): def open_gemini_url(
browser: Browser,
url: str,
redirects: int =0,
use_cache: bool =True,
identity=None
) -> Optional[str]:
"""Open a Gemini URL and set the formatted response as content. """Open a Gemini URL and set the formatted response as content.
While the specification is not set in stone, every client takes a slightly While the specification is not set in stone, every client takes a slightly
@ -27,7 +40,7 @@ def open_gemini_url(browser: Browser, url, redirects=0, use_cache=True):
- STATE_INVALID_CERT: the certificate has non-fatal issues; we may - STATE_INVALID_CERT: the certificate has non-fatal issues; we may
present the user the problems found and let her decide whether to trust present the user the problems found and let her decide whether to trust
temporarily the certificate or not BUT we currently do not parse the temporarily the certificate or not BUT we currently do not parse the
certificate's fields, so this state is never used. certificate's fields, not even the pubkey, so this state is never used.
- STATE_UNKNOWN_CERT: the certificate is valid but has not been seen before; - STATE_UNKNOWN_CERT: the certificate is valid but has not been seen before;
as we're doing TOFU here, we could automatically trust it or let the user as we're doing TOFU here, we could automatically trust it or let the user
choose. For simplicity, we always trust it permanently. choose. For simplicity, we always trust it permanently.
@ -37,23 +50,27 @@ def open_gemini_url(browser: Browser, url, redirects=0, use_cache=True):
- url: a valid URL with Gemini scheme to open. - url: a valid URL with Gemini scheme to open.
- redirects: current amount of redirections done to open the initial URL. - redirects: current amount of redirections done to open the initial URL.
- use_cache: if true, look up if the page is cached before requesting it. - use_cache: if true, look up if the page is cached before requesting it.
- identity: if not None, a tuple of paths to a client cert/key to use.
Returns: Returns:
True on success, False otherwise. The final successfully handled URL on success, None otherwise. Redirected
URLs are not returned.
""" """
if len(url) >= MAX_URL_LEN: if len(url) >= MAX_URL_LEN:
browser.set_status_error("Request URL too long.") browser.set_status_error("Request URL too long.")
return return None
browser.set_status(f"Loading {url}") loading_message_verb = "Loading" if redirects == 0 else "Redirecting to"
loading_message = f"{loading_message_verb} {url}"
browser.set_status(loading_message)
if use_cache and url in browser.cache: if use_cache and url in browser.cache:
browser.load_page(browser.cache[url]) browser.load_page(browser.cache[url])
browser.current_url = url browser.current_url = url
browser.set_status(url) browser.set_status(url)
return True return url
req = Request(url, browser.stash) req = Request(url, browser.stash, identity=identity)
connect_timeout = browser.config["connect_timeout"] connect_timeout = browser.config["connect_timeout"]
connected = req.connect(connect_timeout) connected = req.connect(connect_timeout)
if not connected: if not connected:
@ -68,7 +85,7 @@ def open_gemini_url(browser: Browser, url, redirects=0, use_cache=True):
else: else:
error = f"Connection failed ({url})." error = f"Connection failed ({url})."
browser.set_status_error(error) browser.set_status_error(error)
return False return None
if req.state == Request.STATE_INVALID_CERT: if req.state == Request.STATE_INVALID_CERT:
pass pass
@ -87,11 +104,11 @@ def open_gemini_url(browser: Browser, url, redirects=0, use_cache=True):
data = req.proceed() data = req.proceed()
if not data: if not data:
browser.set_status_error(f"Server did not respond in time ({url}).") browser.set_status_error(f"Server did not respond in time ({url}).")
return False return None
response = Response.parse(data) response = Response.parse(data)
if not response: if not response:
browser.set_status_error(f"Server response parsing failed ({url}).") browser.set_status_error(f"Server response parsing failed ({url}).")
return False return None
return _handle_response(browser, response, url, redirects) return _handle_response(browser, response, url, redirects)
@ -116,26 +133,42 @@ def _handle_untrusted_cert(browser: Browser, request: Request):
browser.load_page(alert_page) browser.load_page(alert_page)
def _handle_response(browser: Browser, response: Response, url: str, def _handle_response(
redirects: int): browser: Browser,
response: Response,
url: str,
redirects: int
) -> Optional[str]:
"""Handle a response from a Gemini server. """Handle a response from a Gemini server.
Returns: Returns:
True on success, False otherwise. The final URL on success, None otherwise.
""" """
if response.code == 20: if response.code == 20:
return _handle_successful_response(browser, response, url) return _handle_successful_response(browser, response, url)
elif response.generic_code == 30 and response.meta: elif response.generic_code == 30 and response.meta:
browser.open_url(response.meta, base_url=url, redirects=redirects + 1) # On redirections, we go back to open_url as the redirection may be to
# another protocol. Discard the result of this request.
browser.open_url(
response.meta,
base_url=url,
redirects=redirects + 1
)
elif response.generic_code in (40, 50): elif response.generic_code in (40, 50):
error = f"Server error: {response.meta or Response.code.name}" error = f"Server error: {response.meta or Response.code.name}"
browser.set_status_error(error) browser.set_status_error(error)
elif response.generic_code == 10: elif response.generic_code == 10:
_handle_input_request(browser, url, response.meta) return _handle_input_request(browser, url, response.meta)
elif response.code == 60:
return _handle_cert_required(browser, response, url, redirects)
elif response.code in (61, 62):
details = response.meta or Response.code.name
error = f"Client certificate error: {details}"
browser.set_status_error(error)
else: else:
error = f"Unhandled response code {response.code}" error = f"Unhandled response code {response.code}"
browser.set_status_error(error) browser.set_status_error(error)
return False return None
def _handle_successful_response(browser: Browser, response: Response, url: str): def _handle_successful_response(browser: Browser, response: Response, url: str):
@ -155,7 +188,7 @@ def _handle_successful_response(browser: Browser, response: Response, url: str):
- response: a successful Response. - response: a successful Response.
Returns: Returns:
True on success, False otherwise. The successfully handled URL on success, None otherwise.
""" """
# Use appropriate response parser according to the MIME type. # Use appropriate response parser according to the MIME type.
mime_type = response.get_mime_type() mime_type = response.get_mime_type()
@ -185,7 +218,7 @@ def _handle_successful_response(browser: Browser, response: Response, url: str):
browser.current_url = url browser.current_url = url
browser.cache[url] = page browser.cache[url] = page
browser.set_status(url) browser.set_status(url)
return True return url
elif filepath: elif filepath:
try: try:
with open(filepath, "wb") as download_file: with open(filepath, "wb") as download_file:
@ -195,10 +228,10 @@ def _handle_successful_response(browser: Browser, response: Response, url: str):
else: else:
browser.set_status(f"Downloaded {url} ({mime_type.short}).") browser.set_status(f"Downloaded {url} ({mime_type.short}).")
browser.last_download = mime_type, filepath browser.last_download = mime_type, filepath
return True return url
elif error: elif error:
browser.set_status_error(error) browser.set_status_error(error)
return False return None
def _get_download_path(url: str, download_dir: Optional[str] =None) -> Path: def _get_download_path(url: str, download_dir: Optional[str] =None) -> Path:
@ -215,8 +248,16 @@ def _get_download_path(url: str, download_dir: Optional[str] =None) -> Path:
return download_path / filename return download_path / filename
def _handle_input_request(browser: Browser, from_url: str, message: str =None): def _handle_input_request(
"""Focus command-line to pass input to the server.""" browser: Browser,
from_url: str,
message: str =None
) -> Optional[str]:
"""Focus command-line to pass input to the server.
Returns:
The result of `open_gemini_url` with the new request including user input.
"""
if message: if message:
browser.set_status(f"Input needed: {message}") browser.set_status(f"Input needed: {message}")
else: else:
@ -225,12 +266,81 @@ def _handle_input_request(browser: Browser, from_url: str, message: str =None):
if not user_input: if not user_input:
return return
url = set_parameter(from_url, user_input) url = set_parameter(from_url, user_input)
open_gemini_url(browser, url) return open_gemini_url(browser, url)
def _handle_cert_required(
browser: Browser,
response: Response,
url: str,
redirects: int
) -> Optional[str]:
"""Find a matching identity and resend the request with it.
Returns:
The result of `open_gemini_url` with the client certificate provided.
"""
identities = load_identities(get_identities_list_path())
if isinstance(identities, str):
browser.set_status_error(f"Can't load identities: {identities}")
return None
url_identities = get_identities_for_url(identities, url)
if not url_identities:
identity = create_identity(browser, url)
if not identity:
return None
identities[url] = [identity]
save_identities(identities, get_identities_list_path())
else:
# TODO support multiple identities; for now we just use the first
# available.
identity = url_identities[0]
cert_path, key_path = get_cert_and_key(identity["id"])
return open_gemini_url(
browser,
url,
redirects=redirects + 1,
use_cache=False,
identity=(cert_path, key_path)
)
def create_identity(browser: Browser, url: str):
"""Walk the user through identity creation.
Returns:
The created identity on success (already registered in identities
"""
key = browser.prompt("Create client certificate? [y/n]", "yn")
if key != "y":
browser.reset_status()
return None
common_name = browser.get_user_text_input(
"Name? The server will see this, you can leave it empty.",
CommandLine.CHAR_TEXT,
strip=True,
)
if not common_name:
browser.reset_status()
return None
browser.set_status("Generating certificate…")
try:
mangled_name = create_certificate(url, common_name)
except ClientCertificateException as exc:
browser.set_status_error(exc.message)
return None
browser.reset_status()
return {"name": common_name, "id": mangled_name}
def forget_certificate(browser: Browser, hostname: str): def forget_certificate(browser: Browser, hostname: str):
"""Remove the fingerprint associated to this hostname for the cert stash.""" """Remove the fingerprint associated to this hostname for the cert stash."""
key = browser.prompt(f"Remove fingerprint from {hostname}? [y/N]", "ynN") key = browser.prompt(f"Remove fingerprint for {hostname}? [y/n]", "yn")
if key != "y": if key != "y":
browser.reset_status() browser.reset_status()
return return

View file

@ -5,6 +5,7 @@ import curses.ascii
import curses.textpad import curses.textpad
import os import os
import tempfile import tempfile
from typing import Optional
from bebop.external import open_external_program from bebop.external import open_external_program
from bebop.links import Links from bebop.links import Links
@ -38,7 +39,7 @@ class CommandLine:
self.window.clear() self.window.clear()
self.window.refresh() self.window.refresh()
def gather(self): def gather(self) -> str:
"""Return the string currently written by the user in command line. """Return the string currently written by the user in command line.
This doesn't count the command char used, but it includes then prefix. This doesn't count the command char used, but it includes then prefix.
@ -46,7 +47,13 @@ class CommandLine:
""" """
return self.textbox.gather()[1:].rstrip() return self.textbox.gather()[1:].rstrip()
def focus(self, command_char, validator=None, prefix=""): def focus(
self,
command_char,
validator=None,
prefix="",
escape_to_none=False
) -> Optional[str]:
"""Give user focus to the command bar. """Give user focus to the command bar.
Show the command char and give focus to the command textbox. The Show the command char and give focus to the command textbox. The
@ -58,10 +65,12 @@ class CommandLine:
- validator: function to use to validate the input chars; if omitted, - validator: function to use to validate the input chars; if omitted,
`validate_common_input` is used. `validate_common_input` is used.
- prefix: string to insert before the cursor in the command line. - prefix: string to insert before the cursor in the command line.
- escape_to_none: if True, an escape interruption returns None instead
of an empty string.
Returns: Returns:
User input as string. The string will be empty if the validator raised User input as string. The string will be empty if the validator raised
an EscapeInterrupt. an EscapeInterrupt, unless `escape_to_none` is True.
""" """
validator = validator or self._validate_common_input validator = validator or self._validate_common_input
self.window.clear() self.window.clear()
@ -71,7 +80,7 @@ class CommandLine:
try: try:
command = self.textbox.edit(validator) command = self.textbox.edit(validator)
except EscapeCommandInterrupt: except EscapeCommandInterrupt:
command = "" command = "" if not escape_to_none else None
except TerminateCommandInterrupt as exc: except TerminateCommandInterrupt as exc:
command = exc.command command = exc.command
else: else:

View file

@ -8,6 +8,7 @@ from functools import lru_cache
from os import getenv from os import getenv
from os.path import expanduser from os.path import expanduser
from pathlib import Path from pathlib import Path
from typing import Optional
APP_NAME = "bebop" APP_NAME = "bebop"
@ -47,9 +48,36 @@ def get_downloads_path() -> Path:
return Path.home() return Path.home()
def ensure_bebop_files_exist(): @lru_cache(None)
"""Ensure various Bebop's files or directories are present.""" def get_identities_list_path():
# Ensure the user data directory exists. """Return the identities JSON file path."""
user_data_path = get_user_data_path() return get_user_data_path() / "identities.json"
if not user_data_path.exists():
user_data_path.mkdir(parents=True)
@lru_cache(None)
def get_identities_path():
"""Return the directory where identities are stored."""
return get_user_data_path() / "identities"
def ensure_bebop_files_exist() -> Optional[str]:
"""Ensure various Bebop's files or directories are present.
Returns:
None if all files and directories are present, an error string otherwise.
"""
try:
# Ensure the user data directory exists.
user_data_path = get_user_data_path()
if not user_data_path.exists():
user_data_path.mkdir(parents=True)
# Ensure the identities file and directory exists.
identities_file_path = get_identities_list_path()
if not identities_file_path.exists():
with open(identities_file_path, "wt") as identities_file:
identities_file.write("{}")
identities_path = get_identities_path()
if not identities_path.exists():
identities_path.mkdir(parents=True)
except OSError as exc:
return str(exc)

128
bebop/identity.py Normal file
View file

@ -0,0 +1,128 @@
"""Identity management, i.e. client certificates.
Identities are created when a server requests them for the first time, and saved
with the corresponding URL. The certificate is automatically presented when the
URL is revisited, and all "children" URLs.
Identities are stored on disk as pairs of certificates/keys. URLs are stored in
an identity file, `identities.json`, a simple URL dict that can be looked up for
identities to use, mapped to an ID to identify the cert/key files.
The identity file and the identities dict both have the following format:
``` json
{
"gemini://example.com/app": [
{
"name": "test",
"id": "geminiexamplecomapp-test",
}
]
}
```
"""
import hashlib
import json
import secrets
import string
import subprocess
from pathlib import Path
from typing import Optional, Union
from bebop.fs import get_identities_path, get_user_data_path
def load_identities(identities_path: Path) -> Union[dict, str]:
"""Return saved identities, else an error str."""
identities = {}
try:
with open(identities_path, "rt") as identities_file:
identities = json.load(identities_file)
except (OSError, ValueError) as exc:
return f"Failed to load identities '{identities_path}': {exc}"
return identities
def save_identities(identities: dict, identities_path: Path):
"""Save the certificate stash. Return True on success, else an error str."""
try:
with open(identities_path, "wt") as identities_file:
json.dump(identities, identities_file)
except (OSError, ValueError) as exc:
return f"Failed to save identities '{identities_path}': {exc}"
return True
class ClientCertificateException(Exception):
def __init__(self, message: str) -> None:
super().__init__()
self.message = message
def get_identities_for_url(identities: dict, url: str) -> list:
"""For a given URL, return all its identities.
If several URLs are prefixes of the given URL, e.g. we look up
"gemini://host/app/sub" and there are identities for both
"gemini://host/app" and "gemini://host/app/sub", the longest URL's
identities are returned (here the latter).
"""
candidates = [key for key in identities if url.startswith(key)]
if not candidates:
return []
return identities[max(candidates, key=len)]
def get_cert_and_key(cert_id: str):
"""Return the paths of the certificate and key file for this ID."""
directory = get_identities_path()
return directory / f"{cert_id}.crt", directory / f"{cert_id}.key"
def create_certificate(url: str, common_name: str):
"""Create a secure self-signed certificate using system's OpenSSL."""
identities_path = get_identities_path()
mangled_name = get_mangled_name(url, common_name)
cert_path = identities_path / f"{mangled_name}.crt"
key_path = identities_path / f"{mangled_name}.key"
command = [
"openssl", "req",
"-newkey", "rsa:4096",
"-nodes",
"-keyform", "PEM",
"-keyout", str(key_path),
"-x509",
"-days", "28140", # https://www.youtube.com/watch?v=F9L4q-0Pi4E
"-outform", "PEM",
"-out", str(cert_path),
"-subj", f"/CN={common_name}",
]
try:
subprocess.check_call(
command,
# stdout=subprocess.DEVNULL,
# stderr=subprocess.DEVNULL,
)
except subprocess.CalledProcessError as exc:
error = "Could not create certificate: " + str(exc)
raise ClientCertificateException(error)
return mangled_name
def get_mangled_name(url: str, common_name: str) -> str:
"""Return a mangled name for the certificate and key files.
This is not obfuscation at all. The mangling is extremely simple and is
just a way to produce names easier on the file system than full URLs.
The mangling is:
`sha256(md5(url) + "-" + common_name + "-" + 8_random_hex_digits)`
with characters that can't be UTF-8 encoded replaced by U+FFFD REPLACEMENT
CHARACTER.
"""
encoded_url = hashlib.md5(url.encode(errors="replace")).hexdigest()
random_hex = hex(secrets.randbits(32))[2:].zfill(8)
name = f"{encoded_url}-{common_name}-{random_hex}"
return hashlib.sha256(name.encode(errors="replace")).hexdigest()

View file

@ -45,11 +45,11 @@ def parse_url(
- absolute: assume the URL is absolute, e.g. in the case we are trying to - absolute: assume the URL is absolute, e.g. in the case we are trying to
parse an URL an user has written, which is most of the time an absolute parse an URL an user has written, which is most of the time an absolute
URL even if not perfectly so. This only has an effect if, after the URL even if not perfectly so. This only has an effect if, after the
initial parsing, there is no scheme or netloc available. initial parsing, there is no netloc available.
- default_scheme: specify the scheme to use if the URL either does not - default_scheme: specify the scheme to use if the URL either does not
specify it and we need it (e.g. there is a location), or `absolute` is specify it and we need it (e.g. there is a location), or `absolute` is
true; if absolute is true but `default_scheme` is not specified, use the true; if absolute is true but `default_scheme` is not specified, a netloc
gemini scheme. marker ("//") is prefixed without scheme.
Returns: Returns:
URL parts, as a dictionary with the following keys: "scheme", "netloc", URL parts, as a dictionary with the following keys: "scheme", "netloc",
@ -69,10 +69,12 @@ def parse_url(
for k in ("scheme", "netloc", "path", "query", "fragment") for k in ("scheme", "netloc", "path", "query", "fragment")
} }
# Smol hack: if we assume it's an absolute URL, just prefix scheme and "//". # Smol hack: if we assume it's an absolute URL and no netloc has been found,
if absolute and not parts["scheme"] and not parts["netloc"]: # just prefix default scheme (if any) and "//".
scheme = default_scheme or "gemini" if absolute and not parts["netloc"]:
return parse_url(scheme + "://" + url) scheme = parts["scheme"] or default_scheme
prefix = scheme + "://" if scheme else "//"
return parse_url(prefix + url)
# Another smol hack: if there is no scheme, use `default_scheme` as default. # Another smol hack: if there is no scheme, use `default_scheme` as default.
if default_scheme and parts["scheme"] is None: if default_scheme and parts["scheme"] is None:

View file

@ -58,7 +58,7 @@ class Request:
# Connection failed. # Connection failed.
STATE_CONNECTION_FAILED = 7 STATE_CONNECTION_FAILED = 7
def __init__(self, url, cert_stash): def __init__(self, url, cert_stash, identity=None):
self.url = url self.url = url
self.cert_stash = cert_stash self.cert_stash = cert_stash
self.state = Request.STATE_INIT self.state = Request.STATE_INIT
@ -67,6 +67,7 @@ class Request:
self.ssock = None self.ssock = None
self.cert_validation = None self.cert_validation = None
self.error = "" self.error = ""
self.identity = identity
def connect(self, timeout: int) -> bool: def connect(self, timeout: int) -> bool:
"""Connect to a Gemini server and return a RequestEventType. """Connect to a Gemini server and return a RequestEventType.
@ -120,6 +121,7 @@ class Request:
check the whole cert fingerprint. Here it is considered the same as a check the whole cert fingerprint. Here it is considered the same as a
valid certificate. valid certificate.
""" """
# Get hostname and port from the URL.
url_parts = GEMINI_URL_RE.match(self.url) url_parts = GEMINI_URL_RE.match(self.url)
if not url_parts: if not url_parts:
self.state = Request.STATE_INVALID_URL self.state = Request.STATE_INVALID_URL
@ -136,6 +138,7 @@ class Request:
port = 1965 port = 1965
self.hostname = hostname self.hostname = hostname
# Prepare the Gemini request.
try: try:
self.payload = self.url.encode() self.payload = self.url.encode()
except ValueError: except ValueError:
@ -143,6 +146,7 @@ class Request:
return False return False
self.payload += LINE_TERM self.payload += LINE_TERM
# Connect to the server.
try: try:
sock = socket.create_connection((hostname, port), timeout=timeout) sock = socket.create_connection((hostname, port), timeout=timeout)
except OSError as exc: except OSError as exc:
@ -150,7 +154,10 @@ class Request:
self.error = exc.strerror self.error = exc.strerror
return False return False
# Setup TLS.
context = Request.get_ssl_context() context = Request.get_ssl_context()
if self.identity:
context.load_cert_chain(*self.identity)
try: try:
self.ssock = context.wrap_socket(sock, server_hostname=hostname) self.ssock = context.wrap_socket(sock, server_hostname=hostname)
except OSError as exc: except OSError as exc:
@ -159,6 +166,7 @@ class Request:
self.error = exc.strerror self.error = exc.strerror
return False return False
# Validate server certificate.
der = self.ssock.getpeercert(binary_form=True) der = self.ssock.getpeercert(binary_form=True)
self.cert_validation = validate_cert(der, hostname, self.cert_stash) self.cert_validation = validate_cert(der, hostname, self.cert_stash)
cert_status = self.cert_validation["status"] cert_status = self.cert_validation["status"]

View file

@ -0,0 +1,32 @@
import unittest
from ..identity import get_identities_for_url
def get_fake_identity(ident: int):
return {"name": f"test{ident}", "id": f"lol{ident}"}
class TestIdentity(unittest.TestCase):
def test_get_identities_for_url(self):
result = get_identities_for_url({}, "gemini://host/path")
self.assertListEqual(result, [])
identities = {
"gemini://host/path": [get_fake_identity(1)],
"gemini://otherhost/path": [get_fake_identity(2)],
}
result = get_identities_for_url(identities, "gemini://host/path")
self.assertListEqual(result, identities["gemini://host/path"])
result = get_identities_for_url(identities, "gemini://bad/path")
self.assertListEqual(result, [])
identities["gemini://host/path/sub"] = [get_fake_identity(3)]
result = get_identities_for_url(identities, "gemini://host/path/sub")
self.assertListEqual(result, identities["gemini://host/path/sub"])
result = get_identities_for_url(identities, "gemini://host/path/sub/a")
self.assertListEqual(result, identities["gemini://host/path/sub"])
result = get_identities_for_url(identities, "gemini://host/path/sus")
self.assertListEqual(result, identities["gemini://host/path"])

View file

@ -35,7 +35,7 @@ class TestNavigation(unittest.TestCase):
# No scheme nor netloc but we should pretend having an absolute URL. # No scheme nor netloc but we should pretend having an absolute URL.
res = parse_url("dece.space/parse-me.gmi", absolute=True) res = parse_url("dece.space/parse-me.gmi", absolute=True)
self.assertEqual(res["scheme"], "gemini") self.assertIsNone(res["scheme"])
self.assertEqual(res["netloc"], "dece.space") self.assertEqual(res["netloc"], "dece.space")
self.assertEqual(res["path"], "/parse-me.gmi") self.assertEqual(res["path"], "/parse-me.gmi")

View file

@ -57,8 +57,8 @@ confident enough before trusting this new certificate.
### How to ensure this new certificate can be trusted? ### How to ensure this new certificate can be trusted?
Can you join the owner through mail or instant messaging? This is the simplest \ Can you join the owner through mail or instant messaging? This is the simplest \
way for you to make sure that the server is fine, and maybe alert the owner on \ way for you to make sure that the server is fine, and maybe alert the server \
a problem on his server she did not notice. owner that there might be an issue.
""" """
@ -68,7 +68,7 @@ def get_cert_stash_path() -> Path:
def load_cert_stash(stash_path: Path) -> Optional[Dict]: def load_cert_stash(stash_path: Path) -> Optional[Dict]:
"""Load the certificate stash from the file, or None on error. """Return the certificate stash from the file, or None on error.
The stash is a dict with host names as keys and tuples as values. Tuples The stash is a dict with host names as keys and tuples as values. Tuples
have four elements: have four elements: