Compare commits
6 commits
5aa03da3e8
...
62997b9385
Author | SHA1 | Date | |
---|---|---|---|
dece | 62997b9385 | ||
dece | f63d875fc3 | ||
dece | 8b1561e689 | ||
dece | 57f01720d6 | ||
dece | 6ceb75b84c | ||
dece | 056616b130 |
20
BOARD.txt
20
BOARD.txt
|
@ -17,12 +17,22 @@ TODO DONE
|
||||||
view history
|
view history
|
||||||
open last download
|
open last download
|
||||||
media files
|
media files
|
||||||
|
identity management
|
||||||
|
logging
|
||||||
home page
|
home page
|
||||||
identity management
|
|
||||||
--------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
BACKLOG
|
BACKLOG
|
||||||
|
--------------------------------------------------------------------------------
|
||||||
|
bug: can't input unicode
|
||||||
click on links to open them
|
click on links to open them
|
||||||
download to disk, not in memory
|
bug: can't reload bebop: pages
|
||||||
|
bug: exiting editor breaks curses
|
||||||
|
dumb rendering mode per site
|
||||||
|
disable cache per site
|
||||||
|
well, preferences per site maybe?
|
||||||
|
download without memory buffer
|
||||||
download in the background
|
download in the background
|
||||||
download view instead of last download
|
download view instead of last download
|
||||||
does encoding really work? cf. egsam
|
does encoding really work? cf. egsam
|
||||||
|
@ -35,5 +45,7 @@ bug: combining chars reduce lengths
|
||||||
non shit command-line
|
non shit command-line
|
||||||
response code 11 (if still there)
|
response code 11 (if still there)
|
||||||
gopher?
|
gopher?
|
||||||
save history
|
opt. maintain history between sessions
|
||||||
history (forward) (useful?)
|
history (forward) (useful?)
|
||||||
|
search in page (ugh)
|
||||||
|
get/set config using command-line
|
||||||
|
|
|
@ -77,6 +77,7 @@ Here are the available options:
|
||||||
|----------------------------|--------------|----------------|---------------------------------------|
|
|----------------------------|--------------|----------------|---------------------------------------|
|
||||||
| `connect_timeout` | int | 10 | Seconds before connection times out. |
|
| `connect_timeout` | int | 10 | Seconds before connection times out. |
|
||||||
| `text_width` | int | 80 | Rendered line length. |
|
| `text_width` | int | 80 | Rendered line length. |
|
||||||
|
| `download_path` | string | | Download path. |
|
||||||
| `source_editor` | string list | `["vi"]` | Command to use for editing sources. |
|
| `source_editor` | string list | `["vi"]` | Command to use for editing sources. |
|
||||||
| `command_editor` | string list | `["vi"]` | Command to use for editing CLI input. |
|
| `command_editor` | string list | `["vi"]` | Command to use for editing CLI input. |
|
||||||
| `history_limit` | int | 1000 | Maximum entries in history. |
|
| `history_limit` | int | 1000 | Maximum entries in history. |
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
import argparse
|
import argparse
|
||||||
|
import logging
|
||||||
|
|
||||||
from bebop.browser.browser import Browser
|
from bebop.browser.browser import Browser
|
||||||
from bebop.config import load_config
|
from bebop.config import load_config
|
||||||
|
@ -9,8 +10,18 @@ from bebop.tofu import get_cert_stash_path, load_cert_stash, save_cert_stash
|
||||||
def main():
|
def main():
|
||||||
argparser = argparse.ArgumentParser()
|
argparser = argparse.ArgumentParser()
|
||||||
argparser.add_argument("url", nargs="?", default=None)
|
argparser.add_argument("url", nargs="?", default=None)
|
||||||
|
argparser.add_argument("-d", "--debug", action="store_true")
|
||||||
args = argparser.parse_args()
|
args = argparser.parse_args()
|
||||||
|
|
||||||
|
if args.debug:
|
||||||
|
logging.basicConfig(
|
||||||
|
filename="bebop.log",
|
||||||
|
level=logging.DEBUG,
|
||||||
|
format="%(asctime)s %(levelname)-8s %(message)s"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logging.disable()
|
||||||
|
|
||||||
if args.url:
|
if args.url:
|
||||||
start_url = args.url
|
start_url = args.url
|
||||||
else:
|
else:
|
||||||
|
@ -19,7 +30,10 @@ def main():
|
||||||
config_path = get_config_path()
|
config_path = get_config_path()
|
||||||
config = load_config(config_path)
|
config = load_config(config_path)
|
||||||
|
|
||||||
ensure_bebop_files_exist()
|
bebop_files_error = ensure_bebop_files_exist()
|
||||||
|
if bebop_files_error:
|
||||||
|
logging.critical(f"Failed to create files: {bebop_files_error}")
|
||||||
|
return
|
||||||
|
|
||||||
cert_stash_path = get_cert_stash_path()
|
cert_stash_path = get_cert_stash_path()
|
||||||
cert_stash = load_cert_stash(cert_stash_path) or {}
|
cert_stash = load_cert_stash(cert_stash_path) or {}
|
||||||
|
|
|
@ -3,12 +3,13 @@
|
||||||
import curses
|
import curses
|
||||||
import curses.ascii
|
import curses.ascii
|
||||||
import curses.textpad
|
import curses.textpad
|
||||||
|
import logging
|
||||||
import os
|
import os
|
||||||
import subprocess
|
import subprocess
|
||||||
import tempfile
|
import tempfile
|
||||||
from math import inf
|
from math import inf
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Optional, Tuple
|
from typing import Dict, Optional, Tuple
|
||||||
|
|
||||||
from bebop.bookmarks import (
|
from bebop.bookmarks import (
|
||||||
get_bookmarks_path, get_bookmarks_document, save_bookmark
|
get_bookmarks_path, get_bookmarks_document, save_bookmark
|
||||||
|
@ -16,8 +17,10 @@ from bebop.bookmarks import (
|
||||||
from bebop.colors import ColorPair, init_colors
|
from bebop.colors import ColorPair, init_colors
|
||||||
from bebop.command_line import CommandLine
|
from bebop.command_line import CommandLine
|
||||||
from bebop.external import open_external_program
|
from bebop.external import open_external_program
|
||||||
from bebop.help import HELP_PAGE
|
from bebop.fs import get_identities_list_path
|
||||||
|
from bebop.help import get_help
|
||||||
from bebop.history import History
|
from bebop.history import History
|
||||||
|
from bebop.identity import load_identities
|
||||||
from bebop.links import Links
|
from bebop.links import Links
|
||||||
from bebop.mime import MimeType
|
from bebop.mime import MimeType
|
||||||
from bebop.mouse import ButtonState
|
from bebop.mouse import ButtonState
|
||||||
|
@ -43,12 +46,13 @@ class Browser:
|
||||||
- status_data: 3-uple of status text, color pair and attributes of the
|
- status_data: 3-uple of status text, color pair and attributes of the
|
||||||
status line, used to reset status after an error.
|
status line, used to reset status after an error.
|
||||||
- history: an History object.
|
- history: an History object.
|
||||||
- cache: a dict containing cached pages
|
- cache: a dict containing cached pages.
|
||||||
- special_pages: a dict containing page names used with "bebop" scheme;
|
- special_pages: a dict containing page names used with "bebop" scheme;
|
||||||
values are dicts as well: the "open" key maps to a callable to use when
|
values are dicts as well: the "open" key maps to a callable to use when
|
||||||
the page is accessed, and the optional "source" key maps to callable
|
the page is accessed, and the optional "source" key maps to callable
|
||||||
returning the page source path.
|
returning the page source path.
|
||||||
- last_download: tuple of MimeType and path, or None.
|
- last_download: tuple of MimeType and path, or None.
|
||||||
|
- identities: identities map.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, config, cert_stash):
|
def __init__(self, config, cert_stash):
|
||||||
|
@ -65,6 +69,7 @@ class Browser:
|
||||||
self.cache = {}
|
self.cache = {}
|
||||||
self.special_pages = self.setup_special_pages()
|
self.special_pages = self.setup_special_pages()
|
||||||
self.last_download: Optional[Tuple[MimeType, Path]] = None
|
self.last_download: Optional[Tuple[MimeType, Path]] = None
|
||||||
|
self.identities = load_identities(get_identities_list_path()) or {}
|
||||||
self._current_url = ""
|
self._current_url = ""
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@ -86,6 +91,11 @@ class Browser:
|
||||||
self._current_url = url
|
self._current_url = url
|
||||||
self.set_status(url)
|
self.set_status(url)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def current_scheme(self):
|
||||||
|
"""Return the scheme of the current URL."""
|
||||||
|
return parse_url(self._current_url)["scheme"] or ""
|
||||||
|
|
||||||
def setup_special_pages(self):
|
def setup_special_pages(self):
|
||||||
"""Return a dict with the special pages functions."""
|
"""Return a dict with the special pages functions."""
|
||||||
return {
|
return {
|
||||||
|
@ -104,6 +114,7 @@ class Browser:
|
||||||
def run(self, *args, **kwargs):
|
def run(self, *args, **kwargs):
|
||||||
"""Use curses' wrapper around _run."""
|
"""Use curses' wrapper around _run."""
|
||||||
os.environ.setdefault("ESCDELAY", "25")
|
os.environ.setdefault("ESCDELAY", "25")
|
||||||
|
logging.info("Cursing…")
|
||||||
curses.wrapper(self._run, *args, **kwargs)
|
curses.wrapper(self._run, *args, **kwargs)
|
||||||
|
|
||||||
def _run(self, stdscr, start_url=None):
|
def _run(self, stdscr, start_url=None):
|
||||||
|
@ -250,6 +261,7 @@ class Browser:
|
||||||
def refresh_status_line(self):
|
def refresh_status_line(self):
|
||||||
"""Refresh status line contents."""
|
"""Refresh status line contents."""
|
||||||
text, pair, attributes = self.status_data
|
text, pair, attributes = self.status_data
|
||||||
|
logging.debug("Status: " + text)
|
||||||
text = text[:self.w - 1]
|
text = text[:self.w - 1]
|
||||||
color = curses.color_pair(pair)
|
color = curses.color_pair(pair)
|
||||||
self.status_line.addstr(0, 0, text, color | attributes)
|
self.status_line.addstr(0, 0, text, color | attributes)
|
||||||
|
@ -295,6 +307,15 @@ class Browser:
|
||||||
from bebop.browser.gemini import forget_certificate
|
from bebop.browser.gemini import forget_certificate
|
||||||
forget_certificate(self, words[1])
|
forget_certificate(self, words[1])
|
||||||
|
|
||||||
|
def get_user_text_input(self, status_text, char, prefix="", strip=False):
|
||||||
|
"""Get user input from the command-line."""
|
||||||
|
self.set_status(status_text)
|
||||||
|
result = self.command_line.focus(char, prefix=prefix)
|
||||||
|
self.reset_status()
|
||||||
|
if strip:
|
||||||
|
result = result.strip()
|
||||||
|
return result
|
||||||
|
|
||||||
def open_url(self, url, base_url=None, redirects=0, assume_absolute=False,
|
def open_url(self, url, base_url=None, redirects=0, assume_absolute=False,
|
||||||
history=True, use_cache=True):
|
history=True, use_cache=True):
|
||||||
"""Try to open an URL.
|
"""Try to open an URL.
|
||||||
|
@ -318,12 +339,14 @@ class Browser:
|
||||||
self.set_status_error(f"Too many redirections ({url}).")
|
self.set_status_error(f"Too many redirections ({url}).")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
current_scheme = self.current_scheme or "gemini"
|
||||||
if assume_absolute or not self.current_url:
|
if assume_absolute or not self.current_url:
|
||||||
parts = parse_url(url, absolute=True, default_scheme="gemini")
|
parts = parse_url(url, absolute=True, default_scheme=current_scheme)
|
||||||
else:
|
else:
|
||||||
parts = parse_url(url)
|
parts = parse_url(url, default_scheme=current_scheme)
|
||||||
|
|
||||||
if parts["scheme"] is None and parts["netloc"] is None:
|
# If there is a no netloc part, try to join the URL.
|
||||||
|
if parts["netloc"] is None and parts["scheme"] == current_scheme:
|
||||||
base_url = base_url or self.current_url
|
base_url = base_url or self.current_url
|
||||||
if base_url:
|
if base_url:
|
||||||
parts = parse_url(join_url(base_url, url))
|
parts = parse_url(join_url(base_url, url))
|
||||||
|
@ -331,10 +354,10 @@ class Browser:
|
||||||
self.set_status_error(f"Can't open '{url}'.")
|
self.set_status_error(f"Can't open '{url}'.")
|
||||||
return
|
return
|
||||||
|
|
||||||
# Replace URL passed as parameter by a proper absolute one.
|
# Replace URL passed as parameter by a sanitized one.
|
||||||
url = unparse_url(parts)
|
url = unparse_url(parts)
|
||||||
|
|
||||||
scheme = parts["scheme"] or ""
|
scheme = parts["scheme"]
|
||||||
if scheme == "gemini":
|
if scheme == "gemini":
|
||||||
from bebop.browser.gemini import open_gemini_url
|
from bebop.browser.gemini import open_gemini_url
|
||||||
success = open_gemini_url(
|
success = open_gemini_url(
|
||||||
|
@ -523,17 +546,15 @@ class Browser:
|
||||||
"""Add the current URL as bookmark."""
|
"""Add the current URL as bookmark."""
|
||||||
if not self.current_url:
|
if not self.current_url:
|
||||||
return
|
return
|
||||||
self.set_status("Bookmark title?")
|
|
||||||
current_title = self.page_pad.current_page.title or ""
|
current_title = self.page_pad.current_page.title or ""
|
||||||
title = self.command_line.focus(
|
title = self.get_user_text_input(
|
||||||
|
"Bookmark title?",
|
||||||
CommandLine.CHAR_TEXT,
|
CommandLine.CHAR_TEXT,
|
||||||
prefix=current_title
|
prefix=current_title,
|
||||||
|
strip=True,
|
||||||
)
|
)
|
||||||
if title:
|
|
||||||
title = title.strip()
|
|
||||||
if title:
|
if title:
|
||||||
save_bookmark(self.current_url, title)
|
save_bookmark(self.current_url, title)
|
||||||
self.reset_status()
|
|
||||||
|
|
||||||
def edit_page(self):
|
def edit_page(self):
|
||||||
"""Open a text editor to edit the page source.
|
"""Open a text editor to edit the page source.
|
||||||
|
@ -572,7 +593,7 @@ class Browser:
|
||||||
|
|
||||||
def open_help(self):
|
def open_help(self):
|
||||||
"""Show the help page."""
|
"""Show the help page."""
|
||||||
self.open_internal_page("help", HELP_PAGE)
|
self.open_internal_page("help", get_help(self.config))
|
||||||
|
|
||||||
def prompt(self, text, keys):
|
def prompt(self, text, keys):
|
||||||
"""Display the text and allow it to type one of the given keys."""
|
"""Display the text and allow it to type one of the given keys."""
|
||||||
|
|
|
@ -1,10 +1,18 @@
|
||||||
"""Gemini-related features of the browser."""
|
"""Gemini-related features of the browser."""
|
||||||
|
|
||||||
|
import logging
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
from bebop.browser.browser import Browser
|
from bebop.browser.browser import Browser
|
||||||
from bebop.command_line import CommandLine
|
from bebop.command_line import CommandLine
|
||||||
from bebop.fs import get_downloads_path
|
from bebop.fs import (
|
||||||
|
get_downloads_path, get_identities_path, get_identities_list_path
|
||||||
|
)
|
||||||
|
from bebop.identity import (
|
||||||
|
ClientCertificateException, create_certificate, get_cert_and_key,
|
||||||
|
get_identities_for_url, load_identities, save_identities
|
||||||
|
)
|
||||||
from bebop.navigation import set_parameter
|
from bebop.navigation import set_parameter
|
||||||
from bebop.page import Page
|
from bebop.page import Page
|
||||||
from bebop.protocol import Request, Response
|
from bebop.protocol import Request, Response
|
||||||
|
@ -14,7 +22,13 @@ from bebop.tofu import trust_fingerprint, untrust_fingerprint, WRONG_FP_ALERT
|
||||||
MAX_URL_LEN = 1024
|
MAX_URL_LEN = 1024
|
||||||
|
|
||||||
|
|
||||||
def open_gemini_url(browser: Browser, url, redirects=0, use_cache=True):
|
def open_gemini_url(
|
||||||
|
browser: Browser,
|
||||||
|
url: str,
|
||||||
|
redirects: int =0,
|
||||||
|
use_cache: bool =True,
|
||||||
|
cert_and_key=None
|
||||||
|
) -> Optional[str]:
|
||||||
"""Open a Gemini URL and set the formatted response as content.
|
"""Open a Gemini URL and set the formatted response as content.
|
||||||
|
|
||||||
While the specification is not set in stone, every client takes a slightly
|
While the specification is not set in stone, every client takes a slightly
|
||||||
|
@ -27,7 +41,7 @@ def open_gemini_url(browser: Browser, url, redirects=0, use_cache=True):
|
||||||
- STATE_INVALID_CERT: the certificate has non-fatal issues; we may
|
- STATE_INVALID_CERT: the certificate has non-fatal issues; we may
|
||||||
present the user the problems found and let her decide whether to trust
|
present the user the problems found and let her decide whether to trust
|
||||||
temporarily the certificate or not BUT we currently do not parse the
|
temporarily the certificate or not BUT we currently do not parse the
|
||||||
certificate's fields, so this state is never used.
|
certificate's fields, not even the pubkey, so this state is never used.
|
||||||
- STATE_UNKNOWN_CERT: the certificate is valid but has not been seen before;
|
- STATE_UNKNOWN_CERT: the certificate is valid but has not been seen before;
|
||||||
as we're doing TOFU here, we could automatically trust it or let the user
|
as we're doing TOFU here, we could automatically trust it or let the user
|
||||||
choose. For simplicity, we always trust it permanently.
|
choose. For simplicity, we always trust it permanently.
|
||||||
|
@ -37,23 +51,38 @@ def open_gemini_url(browser: Browser, url, redirects=0, use_cache=True):
|
||||||
- url: a valid URL with Gemini scheme to open.
|
- url: a valid URL with Gemini scheme to open.
|
||||||
- redirects: current amount of redirections done to open the initial URL.
|
- redirects: current amount of redirections done to open the initial URL.
|
||||||
- use_cache: if true, look up if the page is cached before requesting it.
|
- use_cache: if true, look up if the page is cached before requesting it.
|
||||||
|
- cert_and_key: if not None, a tuple of paths to a client cert/key to use.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
True on success, False otherwise.
|
The final successfully handled URL on success, None otherwise. Redirected
|
||||||
|
URLs are not returned.
|
||||||
"""
|
"""
|
||||||
if len(url) >= MAX_URL_LEN:
|
if len(url) >= MAX_URL_LEN:
|
||||||
browser.set_status_error("Request URL too long.")
|
browser.set_status_error("Request URL too long.")
|
||||||
return
|
return None
|
||||||
|
|
||||||
browser.set_status(f"Loading {url}")
|
loading_message_verb = "Loading" if redirects == 0 else "Redirecting to"
|
||||||
|
loading_message = f"{loading_message_verb} {url}…"
|
||||||
|
browser.set_status(loading_message)
|
||||||
|
|
||||||
|
# If this URL used to request an identity, provide it.
|
||||||
|
if not cert_and_key:
|
||||||
|
url_identities = get_identities_for_url(browser.identities, url)
|
||||||
|
identity = select_identity(url_identities)
|
||||||
|
if identity:
|
||||||
|
cert_and_key = get_cert_and_key(identity["id"])
|
||||||
|
|
||||||
if use_cache and url in browser.cache:
|
if use_cache and url in browser.cache:
|
||||||
browser.load_page(browser.cache[url])
|
browser.load_page(browser.cache[url])
|
||||||
browser.current_url = url
|
browser.current_url = url
|
||||||
browser.set_status(url)
|
browser.set_status(url)
|
||||||
return True
|
return url
|
||||||
|
|
||||||
req = Request(url, browser.stash)
|
logging.info(
|
||||||
|
f"Request {url}"
|
||||||
|
+ (f" using cert and key {cert_and_key}" if cert_and_key else "")
|
||||||
|
)
|
||||||
|
req = Request(url, browser.stash, identity=cert_and_key)
|
||||||
connect_timeout = browser.config["connect_timeout"]
|
connect_timeout = browser.config["connect_timeout"]
|
||||||
connected = req.connect(connect_timeout)
|
connected = req.connect(connect_timeout)
|
||||||
if not connected:
|
if not connected:
|
||||||
|
@ -68,7 +97,7 @@ def open_gemini_url(browser: Browser, url, redirects=0, use_cache=True):
|
||||||
else:
|
else:
|
||||||
error = f"Connection failed ({url})."
|
error = f"Connection failed ({url})."
|
||||||
browser.set_status_error(error)
|
browser.set_status_error(error)
|
||||||
return False
|
return None
|
||||||
|
|
||||||
if req.state == Request.STATE_INVALID_CERT:
|
if req.state == Request.STATE_INVALID_CERT:
|
||||||
pass
|
pass
|
||||||
|
@ -87,11 +116,11 @@ def open_gemini_url(browser: Browser, url, redirects=0, use_cache=True):
|
||||||
data = req.proceed()
|
data = req.proceed()
|
||||||
if not data:
|
if not data:
|
||||||
browser.set_status_error(f"Server did not respond in time ({url}).")
|
browser.set_status_error(f"Server did not respond in time ({url}).")
|
||||||
return False
|
return None
|
||||||
response = Response.parse(data)
|
response = Response.parse(data)
|
||||||
if not response:
|
if not response:
|
||||||
browser.set_status_error(f"Server response parsing failed ({url}).")
|
browser.set_status_error(f"Server response parsing failed ({url}).")
|
||||||
return False
|
return None
|
||||||
|
|
||||||
return _handle_response(browser, response, url, redirects)
|
return _handle_response(browser, response, url, redirects)
|
||||||
|
|
||||||
|
@ -116,26 +145,43 @@ def _handle_untrusted_cert(browser: Browser, request: Request):
|
||||||
browser.load_page(alert_page)
|
browser.load_page(alert_page)
|
||||||
|
|
||||||
|
|
||||||
def _handle_response(browser: Browser, response: Response, url: str,
|
def _handle_response(
|
||||||
redirects: int):
|
browser: Browser,
|
||||||
|
response: Response,
|
||||||
|
url: str,
|
||||||
|
redirects: int
|
||||||
|
) -> Optional[str]:
|
||||||
"""Handle a response from a Gemini server.
|
"""Handle a response from a Gemini server.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
True on success, False otherwise.
|
The final URL on success, None otherwise.
|
||||||
"""
|
"""
|
||||||
|
logging.info(f"Response {response.code} {response.meta}")
|
||||||
if response.code == 20:
|
if response.code == 20:
|
||||||
return _handle_successful_response(browser, response, url)
|
return _handle_successful_response(browser, response, url)
|
||||||
elif response.generic_code == 30 and response.meta:
|
elif response.generic_code == 30 and response.meta:
|
||||||
browser.open_url(response.meta, base_url=url, redirects=redirects + 1)
|
# On redirections, we go back to open_url as the redirection may be to
|
||||||
|
# another protocol. Discard the result of this request.
|
||||||
|
browser.open_url(
|
||||||
|
response.meta,
|
||||||
|
base_url=url,
|
||||||
|
redirects=redirects + 1
|
||||||
|
)
|
||||||
elif response.generic_code in (40, 50):
|
elif response.generic_code in (40, 50):
|
||||||
error = f"Server error: {response.meta or Response.code.name}"
|
error = f"Server error: {response.meta or Response.code.name}"
|
||||||
browser.set_status_error(error)
|
browser.set_status_error(error)
|
||||||
elif response.generic_code == 10:
|
elif response.generic_code == 10:
|
||||||
_handle_input_request(browser, url, response.meta)
|
return _handle_input_request(browser, url, response.meta)
|
||||||
|
elif response.code == 60:
|
||||||
|
return _handle_cert_required(browser, response, url, redirects)
|
||||||
|
elif response.code in (61, 62):
|
||||||
|
details = response.meta or Response.code.name
|
||||||
|
error = f"Client certificate error: {details}"
|
||||||
|
browser.set_status_error(error)
|
||||||
else:
|
else:
|
||||||
error = f"Unhandled response code {response.code}"
|
error = f"Unhandled response code {response.code}"
|
||||||
browser.set_status_error(error)
|
browser.set_status_error(error)
|
||||||
return False
|
return None
|
||||||
|
|
||||||
|
|
||||||
def _handle_successful_response(browser: Browser, response: Response, url: str):
|
def _handle_successful_response(browser: Browser, response: Response, url: str):
|
||||||
|
@ -155,7 +201,7 @@ def _handle_successful_response(browser: Browser, response: Response, url: str):
|
||||||
- response: a successful Response.
|
- response: a successful Response.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
True on success, False otherwise.
|
The successfully handled URL on success, None otherwise.
|
||||||
"""
|
"""
|
||||||
# Use appropriate response parser according to the MIME type.
|
# Use appropriate response parser according to the MIME type.
|
||||||
mime_type = response.get_mime_type()
|
mime_type = response.get_mime_type()
|
||||||
|
@ -175,7 +221,8 @@ def _handle_successful_response(browser: Browser, response: Response, url: str):
|
||||||
text = response.content.decode("utf-8", errors="replace")
|
text = response.content.decode("utf-8", errors="replace")
|
||||||
page = Page.from_text(text)
|
page = Page.from_text(text)
|
||||||
else:
|
else:
|
||||||
filepath = _get_download_path(url)
|
download_dir = browser.config["download_path"]
|
||||||
|
filepath = _get_download_path(url, download_dir=download_dir)
|
||||||
|
|
||||||
# If a page has been produced, load it. Else if a file has been retrieved,
|
# If a page has been produced, load it. Else if a file has been retrieved,
|
||||||
# download it.
|
# download it.
|
||||||
|
@ -184,7 +231,7 @@ def _handle_successful_response(browser: Browser, response: Response, url: str):
|
||||||
browser.current_url = url
|
browser.current_url = url
|
||||||
browser.cache[url] = page
|
browser.cache[url] = page
|
||||||
browser.set_status(url)
|
browser.set_status(url)
|
||||||
return True
|
return url
|
||||||
elif filepath:
|
elif filepath:
|
||||||
try:
|
try:
|
||||||
with open(filepath, "wb") as download_file:
|
with open(filepath, "wb") as download_file:
|
||||||
|
@ -194,26 +241,36 @@ def _handle_successful_response(browser: Browser, response: Response, url: str):
|
||||||
else:
|
else:
|
||||||
browser.set_status(f"Downloaded {url} ({mime_type.short}).")
|
browser.set_status(f"Downloaded {url} ({mime_type.short}).")
|
||||||
browser.last_download = mime_type, filepath
|
browser.last_download = mime_type, filepath
|
||||||
return True
|
return url
|
||||||
elif error:
|
elif error:
|
||||||
browser.set_status_error(error)
|
browser.set_status_error(error)
|
||||||
return False
|
return None
|
||||||
|
|
||||||
|
|
||||||
def _get_download_path(url: str) -> Path:
|
def _get_download_path(url: str, download_dir: Optional[str] =None) -> Path:
|
||||||
"""Try to find the best download file path possible from this URL."""
|
"""Try to find the best download file path possible from this URL."""
|
||||||
download_dir = get_downloads_path()
|
download_path = Path(download_dir) if download_dir else get_downloads_path()
|
||||||
|
if not download_path.exists():
|
||||||
|
download_path.mkdir(parents=True)
|
||||||
url_parts = url.rsplit("/", maxsplit=1)
|
url_parts = url.rsplit("/", maxsplit=1)
|
||||||
if url_parts:
|
if url_parts:
|
||||||
filename = url_parts[-1]
|
filename = url_parts[-1]
|
||||||
else:
|
else:
|
||||||
filename = url.split("://")[1] if "://" in url else url
|
filename = url.split("://")[1] if "://" in url else url
|
||||||
filename = filename.replace("/", "_")
|
filename = filename.replace("/", "_")
|
||||||
return download_dir / filename
|
return download_path / filename
|
||||||
|
|
||||||
|
|
||||||
def _handle_input_request(browser: Browser, from_url: str, message: str =None):
|
def _handle_input_request(
|
||||||
"""Focus command-line to pass input to the server."""
|
browser: Browser,
|
||||||
|
from_url: str,
|
||||||
|
message: str =None
|
||||||
|
) -> Optional[str]:
|
||||||
|
"""Focus command-line to pass input to the server.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The result of `open_gemini_url` with the new request including user input.
|
||||||
|
"""
|
||||||
if message:
|
if message:
|
||||||
browser.set_status(f"Input needed: {message}")
|
browser.set_status(f"Input needed: {message}")
|
||||||
else:
|
else:
|
||||||
|
@ -222,12 +279,86 @@ def _handle_input_request(browser: Browser, from_url: str, message: str =None):
|
||||||
if not user_input:
|
if not user_input:
|
||||||
return
|
return
|
||||||
url = set_parameter(from_url, user_input)
|
url = set_parameter(from_url, user_input)
|
||||||
open_gemini_url(browser, url)
|
return open_gemini_url(browser, url)
|
||||||
|
|
||||||
|
|
||||||
|
def _handle_cert_required(
|
||||||
|
browser: Browser,
|
||||||
|
response: Response,
|
||||||
|
url: str,
|
||||||
|
redirects: int
|
||||||
|
) -> Optional[str]:
|
||||||
|
"""Find a matching identity and resend the request with it.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The result of `open_gemini_url` with the client certificate provided.
|
||||||
|
"""
|
||||||
|
identities = load_identities(get_identities_list_path())
|
||||||
|
if not identities:
|
||||||
|
browser.set_status_error(f"Can't load identities.")
|
||||||
|
return None
|
||||||
|
browser.identities = identities
|
||||||
|
|
||||||
|
url_identities = get_identities_for_url(browser.identities, url)
|
||||||
|
if not url_identities:
|
||||||
|
identity = create_identity(browser, url)
|
||||||
|
if not identity:
|
||||||
|
return None
|
||||||
|
browser.identities[url] = [identity]
|
||||||
|
save_identities(browser.identities, get_identities_list_path())
|
||||||
|
else:
|
||||||
|
identity = select_identity(url_identities)
|
||||||
|
|
||||||
|
cert_path, key_path = get_cert_and_key(identity["id"])
|
||||||
|
return open_gemini_url(
|
||||||
|
browser,
|
||||||
|
url,
|
||||||
|
redirects=redirects + 1,
|
||||||
|
use_cache=False,
|
||||||
|
cert_and_key=(cert_path, key_path)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def select_identity(identities: list):
|
||||||
|
"""Let user select the appropriate identity among candidates."""
|
||||||
|
# TODO support multiple identities; for now we just use the first available.
|
||||||
|
return identities[0] if identities else None
|
||||||
|
|
||||||
|
|
||||||
|
def create_identity(browser: Browser, url: str):
|
||||||
|
"""Walk the user through identity creation.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The created identity on success (already registered in identities
|
||||||
|
"""
|
||||||
|
key = browser.prompt("Create client certificate? [y/n]", "yn")
|
||||||
|
if key != "y":
|
||||||
|
browser.reset_status()
|
||||||
|
return None
|
||||||
|
|
||||||
|
common_name = browser.get_user_text_input(
|
||||||
|
"Name? The server will see this, you can leave it empty.",
|
||||||
|
CommandLine.CHAR_TEXT,
|
||||||
|
strip=True,
|
||||||
|
)
|
||||||
|
if not common_name:
|
||||||
|
browser.reset_status()
|
||||||
|
return None
|
||||||
|
|
||||||
|
browser.set_status("Generating certificate…")
|
||||||
|
try:
|
||||||
|
mangled_name = create_certificate(url, common_name)
|
||||||
|
except ClientCertificateException as exc:
|
||||||
|
browser.set_status_error(exc.message)
|
||||||
|
return None
|
||||||
|
|
||||||
|
browser.reset_status()
|
||||||
|
return {"name": common_name, "id": mangled_name}
|
||||||
|
|
||||||
|
|
||||||
def forget_certificate(browser: Browser, hostname: str):
|
def forget_certificate(browser: Browser, hostname: str):
|
||||||
"""Remove the fingerprint associated to this hostname for the cert stash."""
|
"""Remove the fingerprint associated to this hostname for the cert stash."""
|
||||||
key = browser.prompt(f"Remove fingerprint from {hostname}? [y/N]", "ynN")
|
key = browser.prompt(f"Remove fingerprint for {hostname}? [y/n]", "yn")
|
||||||
if key != "y":
|
if key != "y":
|
||||||
browser.reset_status()
|
browser.reset_status()
|
||||||
return
|
return
|
||||||
|
|
|
@ -5,6 +5,7 @@ import curses.ascii
|
||||||
import curses.textpad
|
import curses.textpad
|
||||||
import os
|
import os
|
||||||
import tempfile
|
import tempfile
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
from bebop.external import open_external_program
|
from bebop.external import open_external_program
|
||||||
from bebop.links import Links
|
from bebop.links import Links
|
||||||
|
@ -38,7 +39,7 @@ class CommandLine:
|
||||||
self.window.clear()
|
self.window.clear()
|
||||||
self.window.refresh()
|
self.window.refresh()
|
||||||
|
|
||||||
def gather(self):
|
def gather(self) -> str:
|
||||||
"""Return the string currently written by the user in command line.
|
"""Return the string currently written by the user in command line.
|
||||||
|
|
||||||
This doesn't count the command char used, but it includes then prefix.
|
This doesn't count the command char used, but it includes then prefix.
|
||||||
|
@ -46,7 +47,13 @@ class CommandLine:
|
||||||
"""
|
"""
|
||||||
return self.textbox.gather()[1:].rstrip()
|
return self.textbox.gather()[1:].rstrip()
|
||||||
|
|
||||||
def focus(self, command_char, validator=None, prefix=""):
|
def focus(
|
||||||
|
self,
|
||||||
|
command_char,
|
||||||
|
validator=None,
|
||||||
|
prefix="",
|
||||||
|
escape_to_none=False
|
||||||
|
) -> Optional[str]:
|
||||||
"""Give user focus to the command bar.
|
"""Give user focus to the command bar.
|
||||||
|
|
||||||
Show the command char and give focus to the command textbox. The
|
Show the command char and give focus to the command textbox. The
|
||||||
|
@ -58,10 +65,12 @@ class CommandLine:
|
||||||
- validator: function to use to validate the input chars; if omitted,
|
- validator: function to use to validate the input chars; if omitted,
|
||||||
`validate_common_input` is used.
|
`validate_common_input` is used.
|
||||||
- prefix: string to insert before the cursor in the command line.
|
- prefix: string to insert before the cursor in the command line.
|
||||||
|
- escape_to_none: if True, an escape interruption returns None instead
|
||||||
|
of an empty string.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
User input as string. The string will be empty if the validator raised
|
User input as string. The string will be empty if the validator raised
|
||||||
an EscapeInterrupt.
|
an EscapeInterrupt, unless `escape_to_none` is True.
|
||||||
"""
|
"""
|
||||||
validator = validator or self._validate_common_input
|
validator = validator or self._validate_common_input
|
||||||
self.window.clear()
|
self.window.clear()
|
||||||
|
@ -71,7 +80,7 @@ class CommandLine:
|
||||||
try:
|
try:
|
||||||
command = self.textbox.edit(validator)
|
command = self.textbox.edit(validator)
|
||||||
except EscapeCommandInterrupt:
|
except EscapeCommandInterrupt:
|
||||||
command = ""
|
command = "" if not escape_to_none else None
|
||||||
except TerminateCommandInterrupt as exc:
|
except TerminateCommandInterrupt as exc:
|
||||||
command = exc.command
|
command = exc.command
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -1,12 +1,14 @@
|
||||||
"""Config management."""
|
"""Config management."""
|
||||||
|
|
||||||
import json
|
import json
|
||||||
|
import logging
|
||||||
import os.path
|
import os.path
|
||||||
|
|
||||||
|
|
||||||
DEFAULT_CONFIG = {
|
DEFAULT_CONFIG = {
|
||||||
"connect_timeout": 10,
|
"connect_timeout": 10,
|
||||||
"text_width": 80,
|
"text_width": 80,
|
||||||
|
"download_path": "",
|
||||||
"source_editor": ["vi"],
|
"source_editor": ["vi"],
|
||||||
"command_editor": ["vi"],
|
"command_editor": ["vi"],
|
||||||
"history_limit": 1000,
|
"history_limit": 1000,
|
||||||
|
@ -24,9 +26,9 @@ def load_config(config_path):
|
||||||
with open(config_path, "rt") as config_file:
|
with open(config_path, "rt") as config_file:
|
||||||
config = json.load(config_file)
|
config = json.load(config_file)
|
||||||
except OSError as exc:
|
except OSError as exc:
|
||||||
print(f"Could not read config file {config_path}: {exc}")
|
logging.error(f"Could not read config file {config_path}: {exc}")
|
||||||
except ValueError as exc:
|
except ValueError as exc:
|
||||||
print(f"Could not parse config file {config_path}: {exc}")
|
logging.error(f"Could not parse config file {config_path}: {exc}")
|
||||||
else:
|
else:
|
||||||
# Fill missing values with defaults.
|
# Fill missing values with defaults.
|
||||||
for key, value in DEFAULT_CONFIG.items():
|
for key, value in DEFAULT_CONFIG.items():
|
||||||
|
@ -41,4 +43,4 @@ def create_default_config(config_path):
|
||||||
with open(config_path, "wt") as config_file:
|
with open(config_path, "wt") as config_file:
|
||||||
json.dump(DEFAULT_CONFIG, config_file, indent=2)
|
json.dump(DEFAULT_CONFIG, config_file, indent=2)
|
||||||
except OSError as exc:
|
except OSError as exc:
|
||||||
print(f"Could not create config file {config_path}: {exc}")
|
logging.error(f"Could not create config file {config_path}: {exc}")
|
||||||
|
|
34
bebop/fs.py
34
bebop/fs.py
|
@ -8,6 +8,7 @@ from functools import lru_cache
|
||||||
from os import getenv
|
from os import getenv
|
||||||
from os.path import expanduser
|
from os.path import expanduser
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
|
||||||
APP_NAME = "bebop"
|
APP_NAME = "bebop"
|
||||||
|
@ -29,7 +30,7 @@ def get_user_data_path() -> Path:
|
||||||
|
|
||||||
@lru_cache(None)
|
@lru_cache(None)
|
||||||
def get_downloads_path() -> Path:
|
def get_downloads_path() -> Path:
|
||||||
"""Return the user downloads directory path."""
|
"""Return the user downloads directory path (fallbacks to home dir)."""
|
||||||
xdg_config_path = Path(getenv("XDG_CONFIG_HOME", expanduser("~/.config")))
|
xdg_config_path = Path(getenv("XDG_CONFIG_HOME", expanduser("~/.config")))
|
||||||
download_path = ""
|
download_path = ""
|
||||||
try:
|
try:
|
||||||
|
@ -47,9 +48,36 @@ def get_downloads_path() -> Path:
|
||||||
return Path.home()
|
return Path.home()
|
||||||
|
|
||||||
|
|
||||||
def ensure_bebop_files_exist():
|
@lru_cache(None)
|
||||||
"""Ensure various Bebop's files or directories are present."""
|
def get_identities_list_path():
|
||||||
|
"""Return the identities JSON file path."""
|
||||||
|
return get_user_data_path() / "identities.json"
|
||||||
|
|
||||||
|
|
||||||
|
@lru_cache(None)
|
||||||
|
def get_identities_path():
|
||||||
|
"""Return the directory where identities are stored."""
|
||||||
|
return get_user_data_path() / "identities"
|
||||||
|
|
||||||
|
|
||||||
|
def ensure_bebop_files_exist() -> Optional[str]:
|
||||||
|
"""Ensure various Bebop's files or directories are present.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
None if all files and directories are present, an error string otherwise.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
# Ensure the user data directory exists.
|
# Ensure the user data directory exists.
|
||||||
user_data_path = get_user_data_path()
|
user_data_path = get_user_data_path()
|
||||||
if not user_data_path.exists():
|
if not user_data_path.exists():
|
||||||
user_data_path.mkdir(parents=True)
|
user_data_path.mkdir(parents=True)
|
||||||
|
# Ensure the identities file and directory exists.
|
||||||
|
identities_file_path = get_identities_list_path()
|
||||||
|
if not identities_file_path.exists():
|
||||||
|
with open(identities_file_path, "wt") as identities_file:
|
||||||
|
identities_file.write("{}")
|
||||||
|
identities_path = get_identities_path()
|
||||||
|
if not identities_path.exists():
|
||||||
|
identities_path.mkdir(parents=True)
|
||||||
|
except OSError as exc:
|
||||||
|
return str(exc)
|
||||||
|
|
|
@ -46,4 +46,18 @@ with arguments by pressing the corresponding keybind above.
|
||||||
* o/open <url>: open this URL
|
* o/open <url>: open this URL
|
||||||
* forget_certificate <hostname>: remove saved fingerprint for the hostname
|
* forget_certificate <hostname>: remove saved fingerprint for the hostname
|
||||||
* q/quit: well, quit
|
* q/quit: well, quit
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
The current configuration is:
|
||||||
|
|
||||||
|
{config_list}
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
def get_help(config):
|
||||||
|
config_list = "\n".join(
|
||||||
|
f"* {key} = {value}"
|
||||||
|
for key, value in config.items()
|
||||||
|
)
|
||||||
|
return HELP_PAGE.format(config_list=config_list)
|
||||||
|
|
131
bebop/identity.py
Normal file
131
bebop/identity.py
Normal file
|
@ -0,0 +1,131 @@
|
||||||
|
"""Identity management, i.e. client certificates.
|
||||||
|
|
||||||
|
Identities are created when a server requests them for the first time, and saved
|
||||||
|
with the corresponding URL. The certificate is automatically presented when the
|
||||||
|
URL is revisited, and all "children" URLs.
|
||||||
|
|
||||||
|
Identities are stored on disk as pairs of certificates/keys. URLs are stored in
|
||||||
|
an identity file, `identities.json`, a simple URL dict that can be looked up for
|
||||||
|
identities to use, mapped to an ID to identify the cert/key files.
|
||||||
|
|
||||||
|
The identity file and the identities dict both have the following format:
|
||||||
|
|
||||||
|
``` json
|
||||||
|
{
|
||||||
|
"gemini://example.com/app": [
|
||||||
|
{
|
||||||
|
"name": "test",
|
||||||
|
"id": "geminiexamplecomapp-test",
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
```
|
||||||
|
"""
|
||||||
|
|
||||||
|
import hashlib
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import secrets
|
||||||
|
import string
|
||||||
|
import subprocess
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Optional, Union
|
||||||
|
|
||||||
|
from bebop.fs import get_identities_path, get_user_data_path
|
||||||
|
|
||||||
|
|
||||||
|
def load_identities(identities_path: Path) -> Optional[dict]:
|
||||||
|
"""Return saved identities or None on error."""
|
||||||
|
identities = {}
|
||||||
|
try:
|
||||||
|
with open(identities_path, "rt") as identities_file:
|
||||||
|
identities = json.load(identities_file)
|
||||||
|
except (OSError, ValueError) as exc:
|
||||||
|
logging.error(f"Failed to load identities '{identities_path}': {exc}")
|
||||||
|
return None
|
||||||
|
return identities
|
||||||
|
|
||||||
|
|
||||||
|
def save_identities(identities: dict, identities_path: Path):
|
||||||
|
"""Save the certificate stash. Return True on success."""
|
||||||
|
try:
|
||||||
|
with open(identities_path, "wt") as identities_file:
|
||||||
|
json.dump(identities, identities_file)
|
||||||
|
except (OSError, ValueError) as exc:
|
||||||
|
logging.error(f"Failed to save identities '{identities_path}': {exc}")
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
class ClientCertificateException(Exception):
|
||||||
|
|
||||||
|
def __init__(self, message: str) -> None:
|
||||||
|
super().__init__()
|
||||||
|
self.message = message
|
||||||
|
|
||||||
|
|
||||||
|
def get_identities_for_url(identities: dict, url: str) -> list:
|
||||||
|
"""For a given URL, return all its identities.
|
||||||
|
|
||||||
|
If several URLs are prefixes of the given URL, e.g. we look up
|
||||||
|
"gemini://host/app/sub" and there are identities for both
|
||||||
|
"gemini://host/app" and "gemini://host/app/sub", the longest URL's
|
||||||
|
identities are returned (here the latter).
|
||||||
|
"""
|
||||||
|
candidates = [key for key in identities if url.startswith(key)]
|
||||||
|
if not candidates:
|
||||||
|
return []
|
||||||
|
return identities[max(candidates, key=len)]
|
||||||
|
|
||||||
|
|
||||||
|
def get_cert_and_key(cert_id: str):
|
||||||
|
"""Return the paths of the certificate and key file for this ID."""
|
||||||
|
directory = get_identities_path()
|
||||||
|
return directory / f"{cert_id}.crt", directory / f"{cert_id}.key"
|
||||||
|
|
||||||
|
|
||||||
|
def create_certificate(url: str, common_name: str):
|
||||||
|
"""Create a secure self-signed certificate using system's OpenSSL."""
|
||||||
|
identities_path = get_identities_path()
|
||||||
|
mangled_name = get_mangled_name(url, common_name)
|
||||||
|
cert_path = identities_path / f"{mangled_name}.crt"
|
||||||
|
key_path = identities_path / f"{mangled_name}.key"
|
||||||
|
command = [
|
||||||
|
"openssl", "req",
|
||||||
|
"-newkey", "rsa:4096",
|
||||||
|
"-nodes",
|
||||||
|
"-keyform", "PEM",
|
||||||
|
"-keyout", str(key_path),
|
||||||
|
"-x509",
|
||||||
|
"-days", "28140", # https://www.youtube.com/watch?v=F9L4q-0Pi4E
|
||||||
|
"-outform", "PEM",
|
||||||
|
"-out", str(cert_path),
|
||||||
|
"-subj", f"/CN={common_name}",
|
||||||
|
]
|
||||||
|
try:
|
||||||
|
subprocess.check_call(
|
||||||
|
command,
|
||||||
|
stdout=subprocess.DEVNULL,
|
||||||
|
stderr=subprocess.DEVNULL,
|
||||||
|
)
|
||||||
|
except subprocess.CalledProcessError as exc:
|
||||||
|
error = "Could not create certificate: " + str(exc)
|
||||||
|
raise ClientCertificateException(error)
|
||||||
|
return mangled_name
|
||||||
|
|
||||||
|
|
||||||
|
def get_mangled_name(url: str, common_name: str) -> str:
|
||||||
|
"""Return a mangled name for the certificate and key files.
|
||||||
|
|
||||||
|
This is not obfuscation at all. The mangling is extremely simple and is
|
||||||
|
just a way to produce names easier on the file system than full URLs.
|
||||||
|
|
||||||
|
The mangling is:
|
||||||
|
`sha256(md5(url) + "-" + common_name + "-" + 8_random_hex_digits)`
|
||||||
|
with characters that can't be UTF-8 encoded replaced by U+FFFD REPLACEMENT
|
||||||
|
CHARACTER.
|
||||||
|
"""
|
||||||
|
encoded_url = hashlib.md5(url.encode(errors="replace")).hexdigest()
|
||||||
|
random_hex = hex(secrets.randbits(32))[2:].zfill(8)
|
||||||
|
name = f"{encoded_url}-{common_name}-{random_hex}"
|
||||||
|
return hashlib.sha256(name.encode(errors="replace")).hexdigest()
|
|
@ -45,11 +45,11 @@ def parse_url(
|
||||||
- absolute: assume the URL is absolute, e.g. in the case we are trying to
|
- absolute: assume the URL is absolute, e.g. in the case we are trying to
|
||||||
parse an URL an user has written, which is most of the time an absolute
|
parse an URL an user has written, which is most of the time an absolute
|
||||||
URL even if not perfectly so. This only has an effect if, after the
|
URL even if not perfectly so. This only has an effect if, after the
|
||||||
initial parsing, there is no scheme or netloc available.
|
initial parsing, there is no netloc available.
|
||||||
- default_scheme: specify the scheme to use if the URL either does not
|
- default_scheme: specify the scheme to use if the URL either does not
|
||||||
specify it and we need it (e.g. there is a location), or `absolute` is
|
specify it and we need it (e.g. there is a location), or `absolute` is
|
||||||
true; if absolute is true but `default_scheme` is not specified, use the
|
true; if absolute is true but `default_scheme` is not specified, a netloc
|
||||||
gemini scheme.
|
marker ("//") is prefixed without scheme.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
URL parts, as a dictionary with the following keys: "scheme", "netloc",
|
URL parts, as a dictionary with the following keys: "scheme", "netloc",
|
||||||
|
@ -69,10 +69,12 @@ def parse_url(
|
||||||
for k in ("scheme", "netloc", "path", "query", "fragment")
|
for k in ("scheme", "netloc", "path", "query", "fragment")
|
||||||
}
|
}
|
||||||
|
|
||||||
# Smol hack: if we assume it's an absolute URL, just prefix scheme and "//".
|
# Smol hack: if we assume it's an absolute URL and no netloc has been found,
|
||||||
if absolute and not parts["scheme"] and not parts["netloc"]:
|
# just prefix default scheme (if any) and "//".
|
||||||
scheme = default_scheme or "gemini"
|
if absolute and not parts["netloc"]:
|
||||||
return parse_url(scheme + "://" + url)
|
scheme = parts["scheme"] or default_scheme
|
||||||
|
prefix = scheme + "://" if scheme else "//"
|
||||||
|
return parse_url(prefix + url)
|
||||||
|
|
||||||
# Another smol hack: if there is no scheme, use `default_scheme` as default.
|
# Another smol hack: if there is no scheme, use `default_scheme` as default.
|
||||||
if default_scheme and parts["scheme"] is None:
|
if default_scheme and parts["scheme"] is None:
|
||||||
|
|
|
@ -33,7 +33,10 @@ class PagePad:
|
||||||
if x <= 0 or y <= 0:
|
if x <= 0 or y <= 0:
|
||||||
return
|
return
|
||||||
content_position = self.current_line, self.current_column
|
content_position = self.current_line, self.current_column
|
||||||
|
try:
|
||||||
self.pad.refresh(*content_position, 0, 0, x, y)
|
self.pad.refresh(*content_position, 0, 0, x, y)
|
||||||
|
except curses.error:
|
||||||
|
pass
|
||||||
|
|
||||||
def scroll_v(self, num_lines: int, window_height: int =0):
|
def scroll_v(self, num_lines: int, window_height: int =0):
|
||||||
"""Make the content pad scroll up and down by num_lines.
|
"""Make the content pad scroll up and down by num_lines.
|
||||||
|
|
|
@ -58,7 +58,7 @@ class Request:
|
||||||
# Connection failed.
|
# Connection failed.
|
||||||
STATE_CONNECTION_FAILED = 7
|
STATE_CONNECTION_FAILED = 7
|
||||||
|
|
||||||
def __init__(self, url, cert_stash):
|
def __init__(self, url, cert_stash, identity=None):
|
||||||
self.url = url
|
self.url = url
|
||||||
self.cert_stash = cert_stash
|
self.cert_stash = cert_stash
|
||||||
self.state = Request.STATE_INIT
|
self.state = Request.STATE_INIT
|
||||||
|
@ -67,6 +67,7 @@ class Request:
|
||||||
self.ssock = None
|
self.ssock = None
|
||||||
self.cert_validation = None
|
self.cert_validation = None
|
||||||
self.error = ""
|
self.error = ""
|
||||||
|
self.identity = identity
|
||||||
|
|
||||||
def connect(self, timeout: int) -> bool:
|
def connect(self, timeout: int) -> bool:
|
||||||
"""Connect to a Gemini server and return a RequestEventType.
|
"""Connect to a Gemini server and return a RequestEventType.
|
||||||
|
@ -120,6 +121,7 @@ class Request:
|
||||||
check the whole cert fingerprint. Here it is considered the same as a
|
check the whole cert fingerprint. Here it is considered the same as a
|
||||||
valid certificate.
|
valid certificate.
|
||||||
"""
|
"""
|
||||||
|
# Get hostname and port from the URL.
|
||||||
url_parts = GEMINI_URL_RE.match(self.url)
|
url_parts = GEMINI_URL_RE.match(self.url)
|
||||||
if not url_parts:
|
if not url_parts:
|
||||||
self.state = Request.STATE_INVALID_URL
|
self.state = Request.STATE_INVALID_URL
|
||||||
|
@ -136,6 +138,7 @@ class Request:
|
||||||
port = 1965
|
port = 1965
|
||||||
self.hostname = hostname
|
self.hostname = hostname
|
||||||
|
|
||||||
|
# Prepare the Gemini request.
|
||||||
try:
|
try:
|
||||||
self.payload = self.url.encode()
|
self.payload = self.url.encode()
|
||||||
except ValueError:
|
except ValueError:
|
||||||
|
@ -143,6 +146,7 @@ class Request:
|
||||||
return False
|
return False
|
||||||
self.payload += LINE_TERM
|
self.payload += LINE_TERM
|
||||||
|
|
||||||
|
# Connect to the server.
|
||||||
try:
|
try:
|
||||||
sock = socket.create_connection((hostname, port), timeout=timeout)
|
sock = socket.create_connection((hostname, port), timeout=timeout)
|
||||||
except OSError as exc:
|
except OSError as exc:
|
||||||
|
@ -150,7 +154,10 @@ class Request:
|
||||||
self.error = exc.strerror
|
self.error = exc.strerror
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
# Setup TLS.
|
||||||
context = Request.get_ssl_context()
|
context = Request.get_ssl_context()
|
||||||
|
if self.identity:
|
||||||
|
context.load_cert_chain(*self.identity)
|
||||||
try:
|
try:
|
||||||
self.ssock = context.wrap_socket(sock, server_hostname=hostname)
|
self.ssock = context.wrap_socket(sock, server_hostname=hostname)
|
||||||
except OSError as exc:
|
except OSError as exc:
|
||||||
|
@ -159,6 +166,7 @@ class Request:
|
||||||
self.error = exc.strerror
|
self.error = exc.strerror
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
# Validate server certificate.
|
||||||
der = self.ssock.getpeercert(binary_form=True)
|
der = self.ssock.getpeercert(binary_form=True)
|
||||||
self.cert_validation = validate_cert(der, hostname, self.cert_stash)
|
self.cert_validation = validate_cert(der, hostname, self.cert_stash)
|
||||||
cert_status = self.cert_validation["status"]
|
cert_status = self.cert_validation["status"]
|
||||||
|
|
32
bebop/tests/test_identity.py
Normal file
32
bebop/tests/test_identity.py
Normal file
|
@ -0,0 +1,32 @@
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
from ..identity import get_identities_for_url
|
||||||
|
|
||||||
|
|
||||||
|
def get_fake_identity(ident: int):
|
||||||
|
return {"name": f"test{ident}", "id": f"lol{ident}"}
|
||||||
|
|
||||||
|
|
||||||
|
class TestIdentity(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_get_identities_for_url(self):
|
||||||
|
result = get_identities_for_url({}, "gemini://host/path")
|
||||||
|
self.assertListEqual(result, [])
|
||||||
|
|
||||||
|
identities = {
|
||||||
|
"gemini://host/path": [get_fake_identity(1)],
|
||||||
|
"gemini://otherhost/path": [get_fake_identity(2)],
|
||||||
|
}
|
||||||
|
|
||||||
|
result = get_identities_for_url(identities, "gemini://host/path")
|
||||||
|
self.assertListEqual(result, identities["gemini://host/path"])
|
||||||
|
result = get_identities_for_url(identities, "gemini://bad/path")
|
||||||
|
self.assertListEqual(result, [])
|
||||||
|
|
||||||
|
identities["gemini://host/path/sub"] = [get_fake_identity(3)]
|
||||||
|
result = get_identities_for_url(identities, "gemini://host/path/sub")
|
||||||
|
self.assertListEqual(result, identities["gemini://host/path/sub"])
|
||||||
|
result = get_identities_for_url(identities, "gemini://host/path/sub/a")
|
||||||
|
self.assertListEqual(result, identities["gemini://host/path/sub"])
|
||||||
|
result = get_identities_for_url(identities, "gemini://host/path/sus")
|
||||||
|
self.assertListEqual(result, identities["gemini://host/path"])
|
|
@ -35,7 +35,7 @@ class TestNavigation(unittest.TestCase):
|
||||||
|
|
||||||
# No scheme nor netloc but we should pretend having an absolute URL.
|
# No scheme nor netloc but we should pretend having an absolute URL.
|
||||||
res = parse_url("dece.space/parse-me.gmi", absolute=True)
|
res = parse_url("dece.space/parse-me.gmi", absolute=True)
|
||||||
self.assertEqual(res["scheme"], "gemini")
|
self.assertIsNone(res["scheme"])
|
||||||
self.assertEqual(res["netloc"], "dece.space")
|
self.assertEqual(res["netloc"], "dece.space")
|
||||||
self.assertEqual(res["path"], "/parse-me.gmi")
|
self.assertEqual(res["path"], "/parse-me.gmi")
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,7 @@ requires more clarity both in specification and in our own implementation.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import hashlib
|
import hashlib
|
||||||
|
import logging
|
||||||
import re
|
import re
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
@ -57,8 +58,8 @@ confident enough before trusting this new certificate.
|
||||||
### How to ensure this new certificate can be trusted?
|
### How to ensure this new certificate can be trusted?
|
||||||
|
|
||||||
Can you join the owner through mail or instant messaging? This is the simplest \
|
Can you join the owner through mail or instant messaging? This is the simplest \
|
||||||
way for you to make sure that the server is fine, and maybe alert the owner on \
|
way for you to make sure that the server is fine, and maybe alert the server \
|
||||||
a problem on his server she did not notice.
|
owner that there might be an issue.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
@ -68,7 +69,7 @@ def get_cert_stash_path() -> Path:
|
||||||
|
|
||||||
|
|
||||||
def load_cert_stash(stash_path: Path) -> Optional[Dict]:
|
def load_cert_stash(stash_path: Path) -> Optional[Dict]:
|
||||||
"""Load the certificate stash from the file, or None on error.
|
"""Return the certificate stash from the file, or None on error.
|
||||||
|
|
||||||
The stash is a dict with host names as keys and tuples as values. Tuples
|
The stash is a dict with host names as keys and tuples as values. Tuples
|
||||||
have four elements:
|
have four elements:
|
||||||
|
@ -105,7 +106,7 @@ def save_cert_stash(stash: dict, stash_path: Path):
|
||||||
entry_line = f"{name} {algo} {fingerprint}\n"
|
entry_line = f"{name} {algo} {fingerprint}\n"
|
||||||
stash_file.write(entry_line)
|
stash_file.write(entry_line)
|
||||||
except (OSError, ValueError) as exc:
|
except (OSError, ValueError) as exc:
|
||||||
print(f"Failed to save certificate stash '{stash_path}': {exc}")
|
logging.error(f"Failed to save certificate stash '{stash_path}': {exc}")
|
||||||
|
|
||||||
|
|
||||||
class CertStatus(Enum):
|
class CertStatus(Enum):
|
||||||
|
|
Reference in a new issue