This repository has been archived on 2024-08-20. You can view files and clone it, but cannot push or open issues or pull requests.
Bebop/bebop/browser/gemini.py

237 lines
8.4 KiB
Python
Raw Normal View History

2021-03-28 18:28:35 +02:00
"""Gemini-related features of the browser."""
2021-04-17 22:59:54 +02:00
from pathlib import Path
2021-03-28 18:28:35 +02:00
from bebop.browser.browser import Browser
2021-04-19 00:28:20 +02:00
from bebop.command_line import CommandLine
2021-04-17 22:59:54 +02:00
from bebop.fs import get_downloads_path
2021-03-28 18:28:35 +02:00
from bebop.navigation import set_parameter
from bebop.page import Page
from bebop.protocol import Request, Response
2021-04-19 02:04:18 +02:00
from bebop.tofu import trust_fingerprint, untrust_fingerprint, WRONG_FP_ALERT
2021-03-28 18:28:35 +02:00
MAX_URL_LEN = 1024
def open_gemini_url(browser: Browser, url, redirects=0, use_cache=True):
2021-03-28 18:28:35 +02:00
"""Open a Gemini URL and set the formatted response as content.
2021-04-19 02:04:18 +02:00
While the specification is not set in stone, every client takes a slightly
different approach to enforcing TOFU. Read the `Request.connect` docs to
find about cases where connection is aborted without asking the user. What
interests us here is what happens when the user should decide herself? This
happens in several cases, matching the request possible states. Here is
what Bebop do (or want to do):
- STATE_INVALID_CERT: the certificate has non-fatal issues; we may
present the user the problems found and let her decide whether to trust
temporarily the certificate or not BUT we currently do not parse the
certificate's fields, so this state is never used.
- STATE_UNKNOWN_CERT: the certificate is valid but has not been seen before;
as we're doing TOFU here, we could automatically trust it or let the user
choose. For simplicity, we always trust it permanently.
Arguments:
2021-04-19 02:04:18 +02:00
- browser: Browser object making the request.
- url: a valid URL with Gemini scheme to open.
- redirects: current amount of redirections done to open the initial URL.
- use_cache: if true, look up if the page is cached before requesting it.
Returns:
True on success, False otherwise.
2021-03-28 18:28:35 +02:00
"""
if len(url) >= MAX_URL_LEN:
2021-04-19 02:04:18 +02:00
browser.set_status_error("Request URL too long.")
return
2021-03-28 18:28:35 +02:00
browser.set_status(f"Loading {url}")
if use_cache and url in browser.cache:
browser.load_page(browser.cache[url])
browser.current_url = url
browser.set_status(url)
return True
2021-03-28 18:28:35 +02:00
req = Request(url, browser.stash)
connect_timeout = browser.config["connect_timeout"]
connected = req.connect(connect_timeout)
2021-03-28 18:28:35 +02:00
if not connected:
if req.state == Request.STATE_ERROR_CERT:
error = f"Certificate was missing or corrupt ({url})."
elif req.state == Request.STATE_UNTRUSTED_CERT:
2021-04-19 02:04:18 +02:00
_handle_untrusted_cert(browser, req)
2021-03-28 18:28:35 +02:00
error = f"Certificate has been changed ({url})."
elif req.state == Request.STATE_CONNECTION_FAILED:
2021-04-19 00:28:20 +02:00
error_details = ": " + req.error if req.error else "."
2021-03-28 18:28:35 +02:00
error = f"Connection failed ({url})" + error_details
else:
error = f"Connection failed ({url})."
browser.set_status_error(error)
return False
2021-03-28 18:28:35 +02:00
if req.state == Request.STATE_INVALID_CERT:
pass
elif req.state == Request.STATE_UNKNOWN_CERT:
2021-04-19 02:04:18 +02:00
# Certificate is valid but unknown: trust it permanently.
hostname = req.hostname
fingerprint = req.cert_validation["hash"]
trust_fingerprint(
browser.stash,
hostname,
"SHA-512",
fingerprint,
trust_always=True
)
2021-03-28 18:28:35 +02:00
data = req.proceed()
if not data:
browser.set_status_error(f"Server did not respond in time ({url}).")
return False
2021-03-28 18:28:35 +02:00
response = Response.parse(data)
if not response:
browser.set_status_error(f"Server response parsing failed ({url}).")
return False
2021-03-28 18:28:35 +02:00
return _handle_response(browser, response, url, redirects)
2021-04-19 00:28:20 +02:00
2021-04-19 02:04:18 +02:00
def _handle_untrusted_cert(browser: Browser, request: Request):
"""Handle a mismatch between known & server fingerprints.
This function formats an alert page to explain to the user what the hell is
going on and displays it.
"""
remote_fp = request.cert_validation["hash"]
local_fp = request.cert_validation["saved_hash"]
alert_page_source = WRONG_FP_ALERT.format(
hostname=request.hostname,
local_fp=local_fp,
remote_fp=remote_fp,
)
alert_page = Page.from_gemtext(
alert_page_source,
browser.config["text_width"]
)
browser.load_page(alert_page)
2021-04-19 00:28:20 +02:00
def _handle_response(browser: Browser, response: Response, url: str,
redirects: int):
"""Handle a response from a Gemini server.
Returns:
True on success, False otherwise.
"""
2021-03-28 18:28:35 +02:00
if response.code == 20:
return _handle_successful_response(browser, response, url)
2021-03-28 18:28:35 +02:00
elif response.generic_code == 30 and response.meta:
browser.open_url(response.meta, base_url=url, redirects=redirects + 1)
elif response.generic_code in (40, 50):
error = f"Server error: {response.meta or Response.code.name}"
browser.set_status_error(error)
elif response.generic_code == 10:
2021-04-19 00:28:20 +02:00
_handle_input_request(browser, url, response.meta)
2021-03-28 18:28:35 +02:00
else:
error = f"Unhandled response code {response.code}"
browser.set_status_error(error)
return False
2021-03-28 18:28:35 +02:00
def _handle_successful_response(browser: Browser, response: Response, url: str):
2021-04-17 22:59:54 +02:00
"""Handle a successful response content from a Gemini server.
2021-03-28 18:28:35 +02:00
2021-04-17 22:59:54 +02:00
According to the MIME type received or inferred, the response is either
rendered by the browser, or saved to disk. If an error occurs, the browser
displays it.
2021-03-28 18:28:35 +02:00
2021-04-17 22:59:54 +02:00
Only text content is rendered. For Gemini, the encoding specified in the
response is used, if available on the Python distribution. For other text
formats, only UTF-8 is attempted.
2021-03-28 18:28:35 +02:00
Arguments:
2021-04-17 22:59:54 +02:00
- browser: Browser instance that made the initial request.
- url: original URL.
2021-03-28 18:28:35 +02:00
- response: a successful Response.
Returns:
True on success, False otherwise.
2021-03-28 18:28:35 +02:00
"""
# Use appropriate response parser according to the MIME type.
2021-03-28 18:28:35 +02:00
mime_type = response.get_mime_type()
2021-04-17 22:59:54 +02:00
page = None
error = None
filepath = None
2021-03-28 18:28:35 +02:00
if mime_type.main_type == "text":
if mime_type.sub_type == "gemini":
encoding = mime_type.charset
try:
text = response.content.decode(encoding, errors="replace")
except LookupError:
2021-04-17 22:59:54 +02:00
error = f"Unknown encoding {encoding}."
else:
page = Page.from_gemtext(text, browser.config["text_width"])
2021-03-28 18:28:35 +02:00
else:
text = response.content.decode("utf-8", errors="replace")
2021-04-17 22:59:54 +02:00
page = Page.from_text(text)
else:
2021-04-19 00:28:20 +02:00
filepath = _get_download_path(url)
2021-04-17 22:59:54 +02:00
# If a page has been produced, load it. Else if a file has been retrieved,
# download it.
2021-04-17 22:59:54 +02:00
if page:
browser.load_page(page)
browser.current_url = url
browser.cache[url] = page
browser.set_status(url)
return True
2021-04-17 22:59:54 +02:00
elif filepath:
try:
with open(filepath, "wb") as download_file:
download_file.write(response.content)
except OSError as exc:
browser.set_status_error(f"Failed to save {url} ({exc})")
else:
browser.set_status(f"Downloaded {url} ({mime_type.short}).")
return True
2021-04-17 22:59:54 +02:00
elif error:
browser.set_status_error(error)
return False
2021-04-17 22:59:54 +02:00
2021-04-19 00:28:20 +02:00
def _get_download_path(url: str) -> Path:
2021-04-17 22:59:54 +02:00
"""Try to find the best download file path possible from this URL."""
download_dir = get_downloads_path()
url_parts = url.rsplit("/", maxsplit=1)
if url_parts:
filename = url_parts[-1]
2021-03-28 18:28:35 +02:00
else:
2021-04-17 22:59:54 +02:00
filename = url.split("://")[1] if "://" in url else url
filename = filename.replace("/", "_")
return download_dir / filename
2021-03-28 18:28:35 +02:00
2021-04-19 00:28:20 +02:00
def _handle_input_request(browser: Browser, from_url: str, message: str =None):
2021-03-28 18:28:35 +02:00
"""Focus command-line to pass input to the server."""
if message:
browser.set_status(f"Input needed: {message}")
else:
browser.set_status("Input needed:")
2021-04-19 00:28:20 +02:00
user_input = browser.command_line.focus(CommandLine.CHAR_TEXT)
if not user_input:
return
url = set_parameter(from_url, user_input)
open_gemini_url(browser, url)
2021-04-19 02:04:18 +02:00
def forget_certificate(browser: Browser, hostname: str):
"""Remove the fingerprint associated to this hostname for the cert stash."""
key = browser.prompt(f"Remove fingerprint from {hostname}? [y/N]", "ynN")
if key != "y":
browser.reset_status()
return
if untrust_fingerprint(browser.stash, hostname):
browser.set_status(f"Known certificate for {hostname} removed.")
else:
browser.set_status_error(f"Known certificate for {hostname} not found.")