Compare commits
No commits in common. "62997b9385fccece83636d85d2ecb8e754701536" and "5aa03da3e8a1d5626ad04acd9f5df190d75f5ac3" have entirely different histories.
62997b9385
...
5aa03da3e8
20
BOARD.txt
20
BOARD.txt
|
@ -17,22 +17,12 @@ TODO DONE
|
|||
view history
|
||||
open last download
|
||||
media files
|
||||
identity management
|
||||
logging
|
||||
home page
|
||||
|
||||
|
||||
|
||||
BACKLOG
|
||||
identity management
|
||||
--------------------------------------------------------------------------------
|
||||
bug: can't input unicode
|
||||
BACKLOG
|
||||
click on links to open them
|
||||
bug: can't reload bebop: pages
|
||||
bug: exiting editor breaks curses
|
||||
dumb rendering mode per site
|
||||
disable cache per site
|
||||
well, preferences per site maybe?
|
||||
download without memory buffer
|
||||
download to disk, not in memory
|
||||
download in the background
|
||||
download view instead of last download
|
||||
does encoding really work? cf. egsam
|
||||
|
@ -45,7 +35,5 @@ bug: combining chars reduce lengths
|
|||
non shit command-line
|
||||
response code 11 (if still there)
|
||||
gopher?
|
||||
opt. maintain history between sessions
|
||||
save history
|
||||
history (forward) (useful?)
|
||||
search in page (ugh)
|
||||
get/set config using command-line
|
||||
|
|
|
@ -77,7 +77,6 @@ Here are the available options:
|
|||
|----------------------------|--------------|----------------|---------------------------------------|
|
||||
| `connect_timeout` | int | 10 | Seconds before connection times out. |
|
||||
| `text_width` | int | 80 | Rendered line length. |
|
||||
| `download_path` | string | | Download path. |
|
||||
| `source_editor` | string list | `["vi"]` | Command to use for editing sources. |
|
||||
| `command_editor` | string list | `["vi"]` | Command to use for editing CLI input. |
|
||||
| `history_limit` | int | 1000 | Maximum entries in history. |
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
import argparse
|
||||
import logging
|
||||
|
||||
from bebop.browser.browser import Browser
|
||||
from bebop.config import load_config
|
||||
|
@ -10,18 +9,8 @@ from bebop.tofu import get_cert_stash_path, load_cert_stash, save_cert_stash
|
|||
def main():
|
||||
argparser = argparse.ArgumentParser()
|
||||
argparser.add_argument("url", nargs="?", default=None)
|
||||
argparser.add_argument("-d", "--debug", action="store_true")
|
||||
args = argparser.parse_args()
|
||||
|
||||
if args.debug:
|
||||
logging.basicConfig(
|
||||
filename="bebop.log",
|
||||
level=logging.DEBUG,
|
||||
format="%(asctime)s %(levelname)-8s %(message)s"
|
||||
)
|
||||
else:
|
||||
logging.disable()
|
||||
|
||||
if args.url:
|
||||
start_url = args.url
|
||||
else:
|
||||
|
@ -30,10 +19,7 @@ def main():
|
|||
config_path = get_config_path()
|
||||
config = load_config(config_path)
|
||||
|
||||
bebop_files_error = ensure_bebop_files_exist()
|
||||
if bebop_files_error:
|
||||
logging.critical(f"Failed to create files: {bebop_files_error}")
|
||||
return
|
||||
ensure_bebop_files_exist()
|
||||
|
||||
cert_stash_path = get_cert_stash_path()
|
||||
cert_stash = load_cert_stash(cert_stash_path) or {}
|
||||
|
|
|
@ -3,13 +3,12 @@
|
|||
import curses
|
||||
import curses.ascii
|
||||
import curses.textpad
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import tempfile
|
||||
from math import inf
|
||||
from pathlib import Path
|
||||
from typing import Dict, Optional, Tuple
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from bebop.bookmarks import (
|
||||
get_bookmarks_path, get_bookmarks_document, save_bookmark
|
||||
|
@ -17,10 +16,8 @@ from bebop.bookmarks import (
|
|||
from bebop.colors import ColorPair, init_colors
|
||||
from bebop.command_line import CommandLine
|
||||
from bebop.external import open_external_program
|
||||
from bebop.fs import get_identities_list_path
|
||||
from bebop.help import get_help
|
||||
from bebop.help import HELP_PAGE
|
||||
from bebop.history import History
|
||||
from bebop.identity import load_identities
|
||||
from bebop.links import Links
|
||||
from bebop.mime import MimeType
|
||||
from bebop.mouse import ButtonState
|
||||
|
@ -44,15 +41,14 @@ class Browser:
|
|||
- command_line: a CommandLine object for the user to interact with.
|
||||
- running: the browser will continue running while this is true.
|
||||
- status_data: 3-uple of status text, color pair and attributes of the
|
||||
status line, used to reset status after an error.
|
||||
status line, used to reset status after an error.
|
||||
- history: an History object.
|
||||
- cache: a dict containing cached pages.
|
||||
- cache: a dict containing cached pages
|
||||
- special_pages: a dict containing page names used with "bebop" scheme;
|
||||
values are dicts as well: the "open" key maps to a callable to use when
|
||||
the page is accessed, and the optional "source" key maps to callable
|
||||
returning the page source path.
|
||||
values are dicts as well: the "open" key maps to a callable to use when
|
||||
the page is accessed, and the optional "source" key maps to callable
|
||||
returning the page source path.
|
||||
- last_download: tuple of MimeType and path, or None.
|
||||
- identities: identities map.
|
||||
"""
|
||||
|
||||
def __init__(self, config, cert_stash):
|
||||
|
@ -69,7 +65,6 @@ class Browser:
|
|||
self.cache = {}
|
||||
self.special_pages = self.setup_special_pages()
|
||||
self.last_download: Optional[Tuple[MimeType, Path]] = None
|
||||
self.identities = load_identities(get_identities_list_path()) or {}
|
||||
self._current_url = ""
|
||||
|
||||
@property
|
||||
|
@ -91,11 +86,6 @@ class Browser:
|
|||
self._current_url = url
|
||||
self.set_status(url)
|
||||
|
||||
@property
|
||||
def current_scheme(self):
|
||||
"""Return the scheme of the current URL."""
|
||||
return parse_url(self._current_url)["scheme"] or ""
|
||||
|
||||
def setup_special_pages(self):
|
||||
"""Return a dict with the special pages functions."""
|
||||
return {
|
||||
|
@ -114,7 +104,6 @@ class Browser:
|
|||
def run(self, *args, **kwargs):
|
||||
"""Use curses' wrapper around _run."""
|
||||
os.environ.setdefault("ESCDELAY", "25")
|
||||
logging.info("Cursing…")
|
||||
curses.wrapper(self._run, *args, **kwargs)
|
||||
|
||||
def _run(self, stdscr, start_url=None):
|
||||
|
@ -261,7 +250,6 @@ class Browser:
|
|||
def refresh_status_line(self):
|
||||
"""Refresh status line contents."""
|
||||
text, pair, attributes = self.status_data
|
||||
logging.debug("Status: " + text)
|
||||
text = text[:self.w - 1]
|
||||
color = curses.color_pair(pair)
|
||||
self.status_line.addstr(0, 0, text, color | attributes)
|
||||
|
@ -307,15 +295,6 @@ class Browser:
|
|||
from bebop.browser.gemini import forget_certificate
|
||||
forget_certificate(self, words[1])
|
||||
|
||||
def get_user_text_input(self, status_text, char, prefix="", strip=False):
|
||||
"""Get user input from the command-line."""
|
||||
self.set_status(status_text)
|
||||
result = self.command_line.focus(char, prefix=prefix)
|
||||
self.reset_status()
|
||||
if strip:
|
||||
result = result.strip()
|
||||
return result
|
||||
|
||||
def open_url(self, url, base_url=None, redirects=0, assume_absolute=False,
|
||||
history=True, use_cache=True):
|
||||
"""Try to open an URL.
|
||||
|
@ -339,14 +318,12 @@ class Browser:
|
|||
self.set_status_error(f"Too many redirections ({url}).")
|
||||
return
|
||||
|
||||
current_scheme = self.current_scheme or "gemini"
|
||||
if assume_absolute or not self.current_url:
|
||||
parts = parse_url(url, absolute=True, default_scheme=current_scheme)
|
||||
parts = parse_url(url, absolute=True, default_scheme="gemini")
|
||||
else:
|
||||
parts = parse_url(url, default_scheme=current_scheme)
|
||||
parts = parse_url(url)
|
||||
|
||||
# If there is a no netloc part, try to join the URL.
|
||||
if parts["netloc"] is None and parts["scheme"] == current_scheme:
|
||||
if parts["scheme"] is None and parts["netloc"] is None:
|
||||
base_url = base_url or self.current_url
|
||||
if base_url:
|
||||
parts = parse_url(join_url(base_url, url))
|
||||
|
@ -354,10 +331,10 @@ class Browser:
|
|||
self.set_status_error(f"Can't open '{url}'.")
|
||||
return
|
||||
|
||||
# Replace URL passed as parameter by a sanitized one.
|
||||
# Replace URL passed as parameter by a proper absolute one.
|
||||
url = unparse_url(parts)
|
||||
|
||||
scheme = parts["scheme"]
|
||||
scheme = parts["scheme"] or ""
|
||||
if scheme == "gemini":
|
||||
from bebop.browser.gemini import open_gemini_url
|
||||
success = open_gemini_url(
|
||||
|
@ -546,15 +523,17 @@ class Browser:
|
|||
"""Add the current URL as bookmark."""
|
||||
if not self.current_url:
|
||||
return
|
||||
self.set_status("Bookmark title?")
|
||||
current_title = self.page_pad.current_page.title or ""
|
||||
title = self.get_user_text_input(
|
||||
"Bookmark title?",
|
||||
title = self.command_line.focus(
|
||||
CommandLine.CHAR_TEXT,
|
||||
prefix=current_title,
|
||||
strip=True,
|
||||
prefix=current_title
|
||||
)
|
||||
if title:
|
||||
save_bookmark(self.current_url, title)
|
||||
title = title.strip()
|
||||
if title:
|
||||
save_bookmark(self.current_url, title)
|
||||
self.reset_status()
|
||||
|
||||
def edit_page(self):
|
||||
"""Open a text editor to edit the page source.
|
||||
|
@ -593,7 +572,7 @@ class Browser:
|
|||
|
||||
def open_help(self):
|
||||
"""Show the help page."""
|
||||
self.open_internal_page("help", get_help(self.config))
|
||||
self.open_internal_page("help", HELP_PAGE)
|
||||
|
||||
def prompt(self, text, keys):
|
||||
"""Display the text and allow it to type one of the given keys."""
|
||||
|
|
|
@ -1,18 +1,10 @@
|
|||
"""Gemini-related features of the browser."""
|
||||
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from bebop.browser.browser import Browser
|
||||
from bebop.command_line import CommandLine
|
||||
from bebop.fs import (
|
||||
get_downloads_path, get_identities_path, get_identities_list_path
|
||||
)
|
||||
from bebop.identity import (
|
||||
ClientCertificateException, create_certificate, get_cert_and_key,
|
||||
get_identities_for_url, load_identities, save_identities
|
||||
)
|
||||
from bebop.fs import get_downloads_path
|
||||
from bebop.navigation import set_parameter
|
||||
from bebop.page import Page
|
||||
from bebop.protocol import Request, Response
|
||||
|
@ -22,13 +14,7 @@ from bebop.tofu import trust_fingerprint, untrust_fingerprint, WRONG_FP_ALERT
|
|||
MAX_URL_LEN = 1024
|
||||
|
||||
|
||||
def open_gemini_url(
|
||||
browser: Browser,
|
||||
url: str,
|
||||
redirects: int =0,
|
||||
use_cache: bool =True,
|
||||
cert_and_key=None
|
||||
) -> Optional[str]:
|
||||
def open_gemini_url(browser: Browser, url, redirects=0, use_cache=True):
|
||||
"""Open a Gemini URL and set the formatted response as content.
|
||||
|
||||
While the specification is not set in stone, every client takes a slightly
|
||||
|
@ -41,7 +27,7 @@ def open_gemini_url(
|
|||
- STATE_INVALID_CERT: the certificate has non-fatal issues; we may
|
||||
present the user the problems found and let her decide whether to trust
|
||||
temporarily the certificate or not BUT we currently do not parse the
|
||||
certificate's fields, not even the pubkey, so this state is never used.
|
||||
certificate's fields, so this state is never used.
|
||||
- STATE_UNKNOWN_CERT: the certificate is valid but has not been seen before;
|
||||
as we're doing TOFU here, we could automatically trust it or let the user
|
||||
choose. For simplicity, we always trust it permanently.
|
||||
|
@ -51,38 +37,23 @@ def open_gemini_url(
|
|||
- url: a valid URL with Gemini scheme to open.
|
||||
- redirects: current amount of redirections done to open the initial URL.
|
||||
- use_cache: if true, look up if the page is cached before requesting it.
|
||||
- cert_and_key: if not None, a tuple of paths to a client cert/key to use.
|
||||
|
||||
Returns:
|
||||
The final successfully handled URL on success, None otherwise. Redirected
|
||||
URLs are not returned.
|
||||
True on success, False otherwise.
|
||||
"""
|
||||
if len(url) >= MAX_URL_LEN:
|
||||
browser.set_status_error("Request URL too long.")
|
||||
return None
|
||||
return
|
||||
|
||||
loading_message_verb = "Loading" if redirects == 0 else "Redirecting to"
|
||||
loading_message = f"{loading_message_verb} {url}…"
|
||||
browser.set_status(loading_message)
|
||||
|
||||
# If this URL used to request an identity, provide it.
|
||||
if not cert_and_key:
|
||||
url_identities = get_identities_for_url(browser.identities, url)
|
||||
identity = select_identity(url_identities)
|
||||
if identity:
|
||||
cert_and_key = get_cert_and_key(identity["id"])
|
||||
browser.set_status(f"Loading {url}")
|
||||
|
||||
if use_cache and url in browser.cache:
|
||||
browser.load_page(browser.cache[url])
|
||||
browser.current_url = url
|
||||
browser.set_status(url)
|
||||
return url
|
||||
return True
|
||||
|
||||
logging.info(
|
||||
f"Request {url}"
|
||||
+ (f" using cert and key {cert_and_key}" if cert_and_key else "")
|
||||
)
|
||||
req = Request(url, browser.stash, identity=cert_and_key)
|
||||
req = Request(url, browser.stash)
|
||||
connect_timeout = browser.config["connect_timeout"]
|
||||
connected = req.connect(connect_timeout)
|
||||
if not connected:
|
||||
|
@ -97,7 +68,7 @@ def open_gemini_url(
|
|||
else:
|
||||
error = f"Connection failed ({url})."
|
||||
browser.set_status_error(error)
|
||||
return None
|
||||
return False
|
||||
|
||||
if req.state == Request.STATE_INVALID_CERT:
|
||||
pass
|
||||
|
@ -116,11 +87,11 @@ def open_gemini_url(
|
|||
data = req.proceed()
|
||||
if not data:
|
||||
browser.set_status_error(f"Server did not respond in time ({url}).")
|
||||
return None
|
||||
return False
|
||||
response = Response.parse(data)
|
||||
if not response:
|
||||
browser.set_status_error(f"Server response parsing failed ({url}).")
|
||||
return None
|
||||
return False
|
||||
|
||||
return _handle_response(browser, response, url, redirects)
|
||||
|
||||
|
@ -145,43 +116,26 @@ def _handle_untrusted_cert(browser: Browser, request: Request):
|
|||
browser.load_page(alert_page)
|
||||
|
||||
|
||||
def _handle_response(
|
||||
browser: Browser,
|
||||
response: Response,
|
||||
url: str,
|
||||
redirects: int
|
||||
) -> Optional[str]:
|
||||
def _handle_response(browser: Browser, response: Response, url: str,
|
||||
redirects: int):
|
||||
"""Handle a response from a Gemini server.
|
||||
|
||||
Returns:
|
||||
The final URL on success, None otherwise.
|
||||
True on success, False otherwise.
|
||||
"""
|
||||
logging.info(f"Response {response.code} {response.meta}")
|
||||
if response.code == 20:
|
||||
return _handle_successful_response(browser, response, url)
|
||||
elif response.generic_code == 30 and response.meta:
|
||||
# On redirections, we go back to open_url as the redirection may be to
|
||||
# another protocol. Discard the result of this request.
|
||||
browser.open_url(
|
||||
response.meta,
|
||||
base_url=url,
|
||||
redirects=redirects + 1
|
||||
)
|
||||
browser.open_url(response.meta, base_url=url, redirects=redirects + 1)
|
||||
elif response.generic_code in (40, 50):
|
||||
error = f"Server error: {response.meta or Response.code.name}"
|
||||
browser.set_status_error(error)
|
||||
elif response.generic_code == 10:
|
||||
return _handle_input_request(browser, url, response.meta)
|
||||
elif response.code == 60:
|
||||
return _handle_cert_required(browser, response, url, redirects)
|
||||
elif response.code in (61, 62):
|
||||
details = response.meta or Response.code.name
|
||||
error = f"Client certificate error: {details}"
|
||||
browser.set_status_error(error)
|
||||
_handle_input_request(browser, url, response.meta)
|
||||
else:
|
||||
error = f"Unhandled response code {response.code}"
|
||||
browser.set_status_error(error)
|
||||
return None
|
||||
return False
|
||||
|
||||
|
||||
def _handle_successful_response(browser: Browser, response: Response, url: str):
|
||||
|
@ -201,7 +155,7 @@ def _handle_successful_response(browser: Browser, response: Response, url: str):
|
|||
- response: a successful Response.
|
||||
|
||||
Returns:
|
||||
The successfully handled URL on success, None otherwise.
|
||||
True on success, False otherwise.
|
||||
"""
|
||||
# Use appropriate response parser according to the MIME type.
|
||||
mime_type = response.get_mime_type()
|
||||
|
@ -221,8 +175,7 @@ def _handle_successful_response(browser: Browser, response: Response, url: str):
|
|||
text = response.content.decode("utf-8", errors="replace")
|
||||
page = Page.from_text(text)
|
||||
else:
|
||||
download_dir = browser.config["download_path"]
|
||||
filepath = _get_download_path(url, download_dir=download_dir)
|
||||
filepath = _get_download_path(url)
|
||||
|
||||
# If a page has been produced, load it. Else if a file has been retrieved,
|
||||
# download it.
|
||||
|
@ -231,7 +184,7 @@ def _handle_successful_response(browser: Browser, response: Response, url: str):
|
|||
browser.current_url = url
|
||||
browser.cache[url] = page
|
||||
browser.set_status(url)
|
||||
return url
|
||||
return True
|
||||
elif filepath:
|
||||
try:
|
||||
with open(filepath, "wb") as download_file:
|
||||
|
@ -241,36 +194,26 @@ def _handle_successful_response(browser: Browser, response: Response, url: str):
|
|||
else:
|
||||
browser.set_status(f"Downloaded {url} ({mime_type.short}).")
|
||||
browser.last_download = mime_type, filepath
|
||||
return url
|
||||
return True
|
||||
elif error:
|
||||
browser.set_status_error(error)
|
||||
return None
|
||||
return False
|
||||
|
||||
|
||||
def _get_download_path(url: str, download_dir: Optional[str] =None) -> Path:
|
||||
def _get_download_path(url: str) -> Path:
|
||||
"""Try to find the best download file path possible from this URL."""
|
||||
download_path = Path(download_dir) if download_dir else get_downloads_path()
|
||||
if not download_path.exists():
|
||||
download_path.mkdir(parents=True)
|
||||
download_dir = get_downloads_path()
|
||||
url_parts = url.rsplit("/", maxsplit=1)
|
||||
if url_parts:
|
||||
filename = url_parts[-1]
|
||||
else:
|
||||
filename = url.split("://")[1] if "://" in url else url
|
||||
filename = filename.replace("/", "_")
|
||||
return download_path / filename
|
||||
return download_dir / filename
|
||||
|
||||
|
||||
def _handle_input_request(
|
||||
browser: Browser,
|
||||
from_url: str,
|
||||
message: str =None
|
||||
) -> Optional[str]:
|
||||
"""Focus command-line to pass input to the server.
|
||||
|
||||
Returns:
|
||||
The result of `open_gemini_url` with the new request including user input.
|
||||
"""
|
||||
def _handle_input_request(browser: Browser, from_url: str, message: str =None):
|
||||
"""Focus command-line to pass input to the server."""
|
||||
if message:
|
||||
browser.set_status(f"Input needed: {message}")
|
||||
else:
|
||||
|
@ -279,86 +222,12 @@ def _handle_input_request(
|
|||
if not user_input:
|
||||
return
|
||||
url = set_parameter(from_url, user_input)
|
||||
return open_gemini_url(browser, url)
|
||||
|
||||
|
||||
def _handle_cert_required(
|
||||
browser: Browser,
|
||||
response: Response,
|
||||
url: str,
|
||||
redirects: int
|
||||
) -> Optional[str]:
|
||||
"""Find a matching identity and resend the request with it.
|
||||
|
||||
Returns:
|
||||
The result of `open_gemini_url` with the client certificate provided.
|
||||
"""
|
||||
identities = load_identities(get_identities_list_path())
|
||||
if not identities:
|
||||
browser.set_status_error(f"Can't load identities.")
|
||||
return None
|
||||
browser.identities = identities
|
||||
|
||||
url_identities = get_identities_for_url(browser.identities, url)
|
||||
if not url_identities:
|
||||
identity = create_identity(browser, url)
|
||||
if not identity:
|
||||
return None
|
||||
browser.identities[url] = [identity]
|
||||
save_identities(browser.identities, get_identities_list_path())
|
||||
else:
|
||||
identity = select_identity(url_identities)
|
||||
|
||||
cert_path, key_path = get_cert_and_key(identity["id"])
|
||||
return open_gemini_url(
|
||||
browser,
|
||||
url,
|
||||
redirects=redirects + 1,
|
||||
use_cache=False,
|
||||
cert_and_key=(cert_path, key_path)
|
||||
)
|
||||
|
||||
|
||||
def select_identity(identities: list):
|
||||
"""Let user select the appropriate identity among candidates."""
|
||||
# TODO support multiple identities; for now we just use the first available.
|
||||
return identities[0] if identities else None
|
||||
|
||||
|
||||
def create_identity(browser: Browser, url: str):
|
||||
"""Walk the user through identity creation.
|
||||
|
||||
Returns:
|
||||
The created identity on success (already registered in identities
|
||||
"""
|
||||
key = browser.prompt("Create client certificate? [y/n]", "yn")
|
||||
if key != "y":
|
||||
browser.reset_status()
|
||||
return None
|
||||
|
||||
common_name = browser.get_user_text_input(
|
||||
"Name? The server will see this, you can leave it empty.",
|
||||
CommandLine.CHAR_TEXT,
|
||||
strip=True,
|
||||
)
|
||||
if not common_name:
|
||||
browser.reset_status()
|
||||
return None
|
||||
|
||||
browser.set_status("Generating certificate…")
|
||||
try:
|
||||
mangled_name = create_certificate(url, common_name)
|
||||
except ClientCertificateException as exc:
|
||||
browser.set_status_error(exc.message)
|
||||
return None
|
||||
|
||||
browser.reset_status()
|
||||
return {"name": common_name, "id": mangled_name}
|
||||
open_gemini_url(browser, url)
|
||||
|
||||
|
||||
def forget_certificate(browser: Browser, hostname: str):
|
||||
"""Remove the fingerprint associated to this hostname for the cert stash."""
|
||||
key = browser.prompt(f"Remove fingerprint for {hostname}? [y/n]", "yn")
|
||||
key = browser.prompt(f"Remove fingerprint from {hostname}? [y/N]", "ynN")
|
||||
if key != "y":
|
||||
browser.reset_status()
|
||||
return
|
||||
|
|
|
@ -5,7 +5,6 @@ import curses.ascii
|
|||
import curses.textpad
|
||||
import os
|
||||
import tempfile
|
||||
from typing import Optional
|
||||
|
||||
from bebop.external import open_external_program
|
||||
from bebop.links import Links
|
||||
|
@ -39,7 +38,7 @@ class CommandLine:
|
|||
self.window.clear()
|
||||
self.window.refresh()
|
||||
|
||||
def gather(self) -> str:
|
||||
def gather(self):
|
||||
"""Return the string currently written by the user in command line.
|
||||
|
||||
This doesn't count the command char used, but it includes then prefix.
|
||||
|
@ -47,13 +46,7 @@ class CommandLine:
|
|||
"""
|
||||
return self.textbox.gather()[1:].rstrip()
|
||||
|
||||
def focus(
|
||||
self,
|
||||
command_char,
|
||||
validator=None,
|
||||
prefix="",
|
||||
escape_to_none=False
|
||||
) -> Optional[str]:
|
||||
def focus(self, command_char, validator=None, prefix=""):
|
||||
"""Give user focus to the command bar.
|
||||
|
||||
Show the command char and give focus to the command textbox. The
|
||||
|
@ -65,12 +58,10 @@ class CommandLine:
|
|||
- validator: function to use to validate the input chars; if omitted,
|
||||
`validate_common_input` is used.
|
||||
- prefix: string to insert before the cursor in the command line.
|
||||
- escape_to_none: if True, an escape interruption returns None instead
|
||||
of an empty string.
|
||||
|
||||
Returns:
|
||||
User input as string. The string will be empty if the validator raised
|
||||
an EscapeInterrupt, unless `escape_to_none` is True.
|
||||
an EscapeInterrupt.
|
||||
"""
|
||||
validator = validator or self._validate_common_input
|
||||
self.window.clear()
|
||||
|
@ -80,7 +71,7 @@ class CommandLine:
|
|||
try:
|
||||
command = self.textbox.edit(validator)
|
||||
except EscapeCommandInterrupt:
|
||||
command = "" if not escape_to_none else None
|
||||
command = ""
|
||||
except TerminateCommandInterrupt as exc:
|
||||
command = exc.command
|
||||
else:
|
||||
|
|
|
@ -1,14 +1,12 @@
|
|||
"""Config management."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os.path
|
||||
|
||||
|
||||
DEFAULT_CONFIG = {
|
||||
"connect_timeout": 10,
|
||||
"text_width": 80,
|
||||
"download_path": "",
|
||||
"source_editor": ["vi"],
|
||||
"command_editor": ["vi"],
|
||||
"history_limit": 1000,
|
||||
|
@ -26,9 +24,9 @@ def load_config(config_path):
|
|||
with open(config_path, "rt") as config_file:
|
||||
config = json.load(config_file)
|
||||
except OSError as exc:
|
||||
logging.error(f"Could not read config file {config_path}: {exc}")
|
||||
print(f"Could not read config file {config_path}: {exc}")
|
||||
except ValueError as exc:
|
||||
logging.error(f"Could not parse config file {config_path}: {exc}")
|
||||
print(f"Could not parse config file {config_path}: {exc}")
|
||||
else:
|
||||
# Fill missing values with defaults.
|
||||
for key, value in DEFAULT_CONFIG.items():
|
||||
|
@ -43,4 +41,4 @@ def create_default_config(config_path):
|
|||
with open(config_path, "wt") as config_file:
|
||||
json.dump(DEFAULT_CONFIG, config_file, indent=2)
|
||||
except OSError as exc:
|
||||
logging.error(f"Could not create config file {config_path}: {exc}")
|
||||
print(f"Could not create config file {config_path}: {exc}")
|
||||
|
|
42
bebop/fs.py
42
bebop/fs.py
|
@ -8,7 +8,6 @@ from functools import lru_cache
|
|||
from os import getenv
|
||||
from os.path import expanduser
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
|
||||
APP_NAME = "bebop"
|
||||
|
@ -30,7 +29,7 @@ def get_user_data_path() -> Path:
|
|||
|
||||
@lru_cache(None)
|
||||
def get_downloads_path() -> Path:
|
||||
"""Return the user downloads directory path (fallbacks to home dir)."""
|
||||
"""Return the user downloads directory path."""
|
||||
xdg_config_path = Path(getenv("XDG_CONFIG_HOME", expanduser("~/.config")))
|
||||
download_path = ""
|
||||
try:
|
||||
|
@ -48,36 +47,9 @@ def get_downloads_path() -> Path:
|
|||
return Path.home()
|
||||
|
||||
|
||||
@lru_cache(None)
|
||||
def get_identities_list_path():
|
||||
"""Return the identities JSON file path."""
|
||||
return get_user_data_path() / "identities.json"
|
||||
|
||||
|
||||
@lru_cache(None)
|
||||
def get_identities_path():
|
||||
"""Return the directory where identities are stored."""
|
||||
return get_user_data_path() / "identities"
|
||||
|
||||
|
||||
def ensure_bebop_files_exist() -> Optional[str]:
|
||||
"""Ensure various Bebop's files or directories are present.
|
||||
|
||||
Returns:
|
||||
None if all files and directories are present, an error string otherwise.
|
||||
"""
|
||||
try:
|
||||
# Ensure the user data directory exists.
|
||||
user_data_path = get_user_data_path()
|
||||
if not user_data_path.exists():
|
||||
user_data_path.mkdir(parents=True)
|
||||
# Ensure the identities file and directory exists.
|
||||
identities_file_path = get_identities_list_path()
|
||||
if not identities_file_path.exists():
|
||||
with open(identities_file_path, "wt") as identities_file:
|
||||
identities_file.write("{}")
|
||||
identities_path = get_identities_path()
|
||||
if not identities_path.exists():
|
||||
identities_path.mkdir(parents=True)
|
||||
except OSError as exc:
|
||||
return str(exc)
|
||||
def ensure_bebop_files_exist():
|
||||
"""Ensure various Bebop's files or directories are present."""
|
||||
# Ensure the user data directory exists.
|
||||
user_data_path = get_user_data_path()
|
||||
if not user_data_path.exists():
|
||||
user_data_path.mkdir(parents=True)
|
||||
|
|
|
@ -46,18 +46,4 @@ with arguments by pressing the corresponding keybind above.
|
|||
* o/open <url>: open this URL
|
||||
* forget_certificate <hostname>: remove saved fingerprint for the hostname
|
||||
* q/quit: well, quit
|
||||
|
||||
## Configuration
|
||||
|
||||
The current configuration is:
|
||||
|
||||
{config_list}
|
||||
"""
|
||||
|
||||
|
||||
def get_help(config):
|
||||
config_list = "\n".join(
|
||||
f"* {key} = {value}"
|
||||
for key, value in config.items()
|
||||
)
|
||||
return HELP_PAGE.format(config_list=config_list)
|
||||
|
|
|
@ -1,131 +0,0 @@
|
|||
"""Identity management, i.e. client certificates.
|
||||
|
||||
Identities are created when a server requests them for the first time, and saved
|
||||
with the corresponding URL. The certificate is automatically presented when the
|
||||
URL is revisited, and all "children" URLs.
|
||||
|
||||
Identities are stored on disk as pairs of certificates/keys. URLs are stored in
|
||||
an identity file, `identities.json`, a simple URL dict that can be looked up for
|
||||
identities to use, mapped to an ID to identify the cert/key files.
|
||||
|
||||
The identity file and the identities dict both have the following format:
|
||||
|
||||
``` json
|
||||
{
|
||||
"gemini://example.com/app": [
|
||||
{
|
||||
"name": "test",
|
||||
"id": "geminiexamplecomapp-test",
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import secrets
|
||||
import string
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import Optional, Union
|
||||
|
||||
from bebop.fs import get_identities_path, get_user_data_path
|
||||
|
||||
|
||||
def load_identities(identities_path: Path) -> Optional[dict]:
|
||||
"""Return saved identities or None on error."""
|
||||
identities = {}
|
||||
try:
|
||||
with open(identities_path, "rt") as identities_file:
|
||||
identities = json.load(identities_file)
|
||||
except (OSError, ValueError) as exc:
|
||||
logging.error(f"Failed to load identities '{identities_path}': {exc}")
|
||||
return None
|
||||
return identities
|
||||
|
||||
|
||||
def save_identities(identities: dict, identities_path: Path):
|
||||
"""Save the certificate stash. Return True on success."""
|
||||
try:
|
||||
with open(identities_path, "wt") as identities_file:
|
||||
json.dump(identities, identities_file)
|
||||
except (OSError, ValueError) as exc:
|
||||
logging.error(f"Failed to save identities '{identities_path}': {exc}")
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
class ClientCertificateException(Exception):
|
||||
|
||||
def __init__(self, message: str) -> None:
|
||||
super().__init__()
|
||||
self.message = message
|
||||
|
||||
|
||||
def get_identities_for_url(identities: dict, url: str) -> list:
|
||||
"""For a given URL, return all its identities.
|
||||
|
||||
If several URLs are prefixes of the given URL, e.g. we look up
|
||||
"gemini://host/app/sub" and there are identities for both
|
||||
"gemini://host/app" and "gemini://host/app/sub", the longest URL's
|
||||
identities are returned (here the latter).
|
||||
"""
|
||||
candidates = [key for key in identities if url.startswith(key)]
|
||||
if not candidates:
|
||||
return []
|
||||
return identities[max(candidates, key=len)]
|
||||
|
||||
|
||||
def get_cert_and_key(cert_id: str):
|
||||
"""Return the paths of the certificate and key file for this ID."""
|
||||
directory = get_identities_path()
|
||||
return directory / f"{cert_id}.crt", directory / f"{cert_id}.key"
|
||||
|
||||
|
||||
def create_certificate(url: str, common_name: str):
|
||||
"""Create a secure self-signed certificate using system's OpenSSL."""
|
||||
identities_path = get_identities_path()
|
||||
mangled_name = get_mangled_name(url, common_name)
|
||||
cert_path = identities_path / f"{mangled_name}.crt"
|
||||
key_path = identities_path / f"{mangled_name}.key"
|
||||
command = [
|
||||
"openssl", "req",
|
||||
"-newkey", "rsa:4096",
|
||||
"-nodes",
|
||||
"-keyform", "PEM",
|
||||
"-keyout", str(key_path),
|
||||
"-x509",
|
||||
"-days", "28140", # https://www.youtube.com/watch?v=F9L4q-0Pi4E
|
||||
"-outform", "PEM",
|
||||
"-out", str(cert_path),
|
||||
"-subj", f"/CN={common_name}",
|
||||
]
|
||||
try:
|
||||
subprocess.check_call(
|
||||
command,
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL,
|
||||
)
|
||||
except subprocess.CalledProcessError as exc:
|
||||
error = "Could not create certificate: " + str(exc)
|
||||
raise ClientCertificateException(error)
|
||||
return mangled_name
|
||||
|
||||
|
||||
def get_mangled_name(url: str, common_name: str) -> str:
|
||||
"""Return a mangled name for the certificate and key files.
|
||||
|
||||
This is not obfuscation at all. The mangling is extremely simple and is
|
||||
just a way to produce names easier on the file system than full URLs.
|
||||
|
||||
The mangling is:
|
||||
`sha256(md5(url) + "-" + common_name + "-" + 8_random_hex_digits)`
|
||||
with characters that can't be UTF-8 encoded replaced by U+FFFD REPLACEMENT
|
||||
CHARACTER.
|
||||
"""
|
||||
encoded_url = hashlib.md5(url.encode(errors="replace")).hexdigest()
|
||||
random_hex = hex(secrets.randbits(32))[2:].zfill(8)
|
||||
name = f"{encoded_url}-{common_name}-{random_hex}"
|
||||
return hashlib.sha256(name.encode(errors="replace")).hexdigest()
|
|
@ -45,11 +45,11 @@ def parse_url(
|
|||
- absolute: assume the URL is absolute, e.g. in the case we are trying to
|
||||
parse an URL an user has written, which is most of the time an absolute
|
||||
URL even if not perfectly so. This only has an effect if, after the
|
||||
initial parsing, there is no netloc available.
|
||||
initial parsing, there is no scheme or netloc available.
|
||||
- default_scheme: specify the scheme to use if the URL either does not
|
||||
specify it and we need it (e.g. there is a location), or `absolute` is
|
||||
true; if absolute is true but `default_scheme` is not specified, a netloc
|
||||
marker ("//") is prefixed without scheme.
|
||||
true; if absolute is true but `default_scheme` is not specified, use the
|
||||
gemini scheme.
|
||||
|
||||
Returns:
|
||||
URL parts, as a dictionary with the following keys: "scheme", "netloc",
|
||||
|
@ -69,12 +69,10 @@ def parse_url(
|
|||
for k in ("scheme", "netloc", "path", "query", "fragment")
|
||||
}
|
||||
|
||||
# Smol hack: if we assume it's an absolute URL and no netloc has been found,
|
||||
# just prefix default scheme (if any) and "//".
|
||||
if absolute and not parts["netloc"]:
|
||||
scheme = parts["scheme"] or default_scheme
|
||||
prefix = scheme + "://" if scheme else "//"
|
||||
return parse_url(prefix + url)
|
||||
# Smol hack: if we assume it's an absolute URL, just prefix scheme and "//".
|
||||
if absolute and not parts["scheme"] and not parts["netloc"]:
|
||||
scheme = default_scheme or "gemini"
|
||||
return parse_url(scheme + "://" + url)
|
||||
|
||||
# Another smol hack: if there is no scheme, use `default_scheme` as default.
|
||||
if default_scheme and parts["scheme"] is None:
|
||||
|
|
|
@ -33,10 +33,7 @@ class PagePad:
|
|||
if x <= 0 or y <= 0:
|
||||
return
|
||||
content_position = self.current_line, self.current_column
|
||||
try:
|
||||
self.pad.refresh(*content_position, 0, 0, x, y)
|
||||
except curses.error:
|
||||
pass
|
||||
self.pad.refresh(*content_position, 0, 0, x, y)
|
||||
|
||||
def scroll_v(self, num_lines: int, window_height: int =0):
|
||||
"""Make the content pad scroll up and down by num_lines.
|
||||
|
|
|
@ -58,7 +58,7 @@ class Request:
|
|||
# Connection failed.
|
||||
STATE_CONNECTION_FAILED = 7
|
||||
|
||||
def __init__(self, url, cert_stash, identity=None):
|
||||
def __init__(self, url, cert_stash):
|
||||
self.url = url
|
||||
self.cert_stash = cert_stash
|
||||
self.state = Request.STATE_INIT
|
||||
|
@ -67,7 +67,6 @@ class Request:
|
|||
self.ssock = None
|
||||
self.cert_validation = None
|
||||
self.error = ""
|
||||
self.identity = identity
|
||||
|
||||
def connect(self, timeout: int) -> bool:
|
||||
"""Connect to a Gemini server and return a RequestEventType.
|
||||
|
@ -121,7 +120,6 @@ class Request:
|
|||
check the whole cert fingerprint. Here it is considered the same as a
|
||||
valid certificate.
|
||||
"""
|
||||
# Get hostname and port from the URL.
|
||||
url_parts = GEMINI_URL_RE.match(self.url)
|
||||
if not url_parts:
|
||||
self.state = Request.STATE_INVALID_URL
|
||||
|
@ -138,7 +136,6 @@ class Request:
|
|||
port = 1965
|
||||
self.hostname = hostname
|
||||
|
||||
# Prepare the Gemini request.
|
||||
try:
|
||||
self.payload = self.url.encode()
|
||||
except ValueError:
|
||||
|
@ -146,7 +143,6 @@ class Request:
|
|||
return False
|
||||
self.payload += LINE_TERM
|
||||
|
||||
# Connect to the server.
|
||||
try:
|
||||
sock = socket.create_connection((hostname, port), timeout=timeout)
|
||||
except OSError as exc:
|
||||
|
@ -154,10 +150,7 @@ class Request:
|
|||
self.error = exc.strerror
|
||||
return False
|
||||
|
||||
# Setup TLS.
|
||||
context = Request.get_ssl_context()
|
||||
if self.identity:
|
||||
context.load_cert_chain(*self.identity)
|
||||
try:
|
||||
self.ssock = context.wrap_socket(sock, server_hostname=hostname)
|
||||
except OSError as exc:
|
||||
|
@ -166,7 +159,6 @@ class Request:
|
|||
self.error = exc.strerror
|
||||
return False
|
||||
|
||||
# Validate server certificate.
|
||||
der = self.ssock.getpeercert(binary_form=True)
|
||||
self.cert_validation = validate_cert(der, hostname, self.cert_stash)
|
||||
cert_status = self.cert_validation["status"]
|
||||
|
|
|
@ -1,32 +0,0 @@
|
|||
import unittest
|
||||
|
||||
from ..identity import get_identities_for_url
|
||||
|
||||
|
||||
def get_fake_identity(ident: int):
|
||||
return {"name": f"test{ident}", "id": f"lol{ident}"}
|
||||
|
||||
|
||||
class TestIdentity(unittest.TestCase):
|
||||
|
||||
def test_get_identities_for_url(self):
|
||||
result = get_identities_for_url({}, "gemini://host/path")
|
||||
self.assertListEqual(result, [])
|
||||
|
||||
identities = {
|
||||
"gemini://host/path": [get_fake_identity(1)],
|
||||
"gemini://otherhost/path": [get_fake_identity(2)],
|
||||
}
|
||||
|
||||
result = get_identities_for_url(identities, "gemini://host/path")
|
||||
self.assertListEqual(result, identities["gemini://host/path"])
|
||||
result = get_identities_for_url(identities, "gemini://bad/path")
|
||||
self.assertListEqual(result, [])
|
||||
|
||||
identities["gemini://host/path/sub"] = [get_fake_identity(3)]
|
||||
result = get_identities_for_url(identities, "gemini://host/path/sub")
|
||||
self.assertListEqual(result, identities["gemini://host/path/sub"])
|
||||
result = get_identities_for_url(identities, "gemini://host/path/sub/a")
|
||||
self.assertListEqual(result, identities["gemini://host/path/sub"])
|
||||
result = get_identities_for_url(identities, "gemini://host/path/sus")
|
||||
self.assertListEqual(result, identities["gemini://host/path"])
|
|
@ -35,7 +35,7 @@ class TestNavigation(unittest.TestCase):
|
|||
|
||||
# No scheme nor netloc but we should pretend having an absolute URL.
|
||||
res = parse_url("dece.space/parse-me.gmi", absolute=True)
|
||||
self.assertIsNone(res["scheme"])
|
||||
self.assertEqual(res["scheme"], "gemini")
|
||||
self.assertEqual(res["netloc"], "dece.space")
|
||||
self.assertEqual(res["path"], "/parse-me.gmi")
|
||||
|
||||
|
|
|
@ -5,7 +5,6 @@ requires more clarity both in specification and in our own implementation.
|
|||
"""
|
||||
|
||||
import hashlib
|
||||
import logging
|
||||
import re
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
|
@ -58,8 +57,8 @@ confident enough before trusting this new certificate.
|
|||
### How to ensure this new certificate can be trusted?
|
||||
|
||||
Can you join the owner through mail or instant messaging? This is the simplest \
|
||||
way for you to make sure that the server is fine, and maybe alert the server \
|
||||
owner that there might be an issue.
|
||||
way for you to make sure that the server is fine, and maybe alert the owner on \
|
||||
a problem on his server she did not notice.
|
||||
"""
|
||||
|
||||
|
||||
|
@ -69,7 +68,7 @@ def get_cert_stash_path() -> Path:
|
|||
|
||||
|
||||
def load_cert_stash(stash_path: Path) -> Optional[Dict]:
|
||||
"""Return the certificate stash from the file, or None on error.
|
||||
"""Load the certificate stash from the file, or None on error.
|
||||
|
||||
The stash is a dict with host names as keys and tuples as values. Tuples
|
||||
have four elements:
|
||||
|
@ -106,7 +105,7 @@ def save_cert_stash(stash: dict, stash_path: Path):
|
|||
entry_line = f"{name} {algo} {fingerprint}\n"
|
||||
stash_file.write(entry_line)
|
||||
except (OSError, ValueError) as exc:
|
||||
logging.error(f"Failed to save certificate stash '{stash_path}': {exc}")
|
||||
print(f"Failed to save certificate stash '{stash_path}': {exc}")
|
||||
|
||||
|
||||
class CertStatus(Enum):
|
||||
|
|
Reference in a new issue