browser: split in several files

exec
dece 3 years ago
parent 5b3e91336f
commit bd7cfce520

@ -1,6 +1,6 @@
import argparse
from bebop.browser import Browser
from bebop.browser.browser import Browser
from bebop.fs import get_user_data_path
from bebop.tofu import load_cert_stash, save_cert_stash

@ -4,7 +4,6 @@ import curses
import curses.ascii
import curses.textpad
import os
import webbrowser
from math import inf
from bebop.bookmarks import get_bookmarks_document, save_bookmark
@ -13,15 +12,15 @@ from bebop.command_line import CommandLine
from bebop.history import History
from bebop.links import Links
from bebop.mouse import ButtonState
from bebop.navigation import *
from bebop.navigation import (
get_parent_url, get_root_url, join_url, parse_url, sanitize_url)
from bebop.page import Page
from bebop.page_pad import PagePad
from bebop.protocol import Request, Response
class Browser:
"""Manage the events, inputs and rendering."""
def __init__(self, cert_stash):
self.stash = cert_stash or {}
self.screen = None
@ -251,133 +250,38 @@ class Browser:
if redirects > 5:
self.set_status_error(f"Too many redirections ({url}).")
return
if assume_absolute or not self.current_url:
parts = parse_url(url, absolute=True)
join = False
else:
parts = parse_url(url)
join = True
if parts.scheme == "gemini":
from bebop.browser.gemini import open_gemini_url
# If there is no netloc, this is a relative URL.
if join or base_url:
url = join_url(base_url or self.current_url, url)
self.open_gemini_url(sanitize_url(url), redirects=redirects,
history=history, use_cache=use_cache)
open_gemini_url(
self,
sanitize_url(url),
redirects=redirects,
history=history,
use_cache=use_cache
)
elif parts.scheme.startswith("http"):
self.open_web_url(url)
from bebop.browser.web import open_web_url
open_web_url(self, url)
elif parts.scheme == "file":
self.open_file(parts.path, history=history)
from bebop.browser.file import open_file
open_file(self, parts.path, history=history)
elif parts.scheme == "bebop":
if parts.netloc == "bookmarks":
self.open_bookmarks()
else:
self.set_status_error(f"Protocol {parts.scheme} not supported.")
def open_gemini_url(self, url, redirects=0, history=True, use_cache=True):
"""Open a Gemini URL and set the formatted response as content.
After initiating the connection, TODO
"""
self.set_status(f"Loading {url}")
if use_cache and url in self.cache:
self.load_page(self.cache[url])
if self.current_url and history:
self.history.push(self.current_url)
self.current_url = url
self.set_status(url)
return
req = Request(url, self.stash)
connected = req.connect()
if not connected:
if req.state == Request.STATE_ERROR_CERT:
error = f"Certificate was missing or corrupt ({url})."
elif req.state == Request.STATE_UNTRUSTED_CERT:
error = f"Certificate has been changed ({url})."
# TODO propose the user ways to handle this.
elif req.state == Request.STATE_CONNECTION_FAILED:
error_details = f": {req.error}" if req.error else "."
error = f"Connection failed ({url})" + error_details
else:
error = f"Connection failed ({url})."
self.set_status_error(error)
return
if req.state == Request.STATE_INVALID_CERT:
# TODO propose abort / temp trust
pass
elif req.state == Request.STATE_UNKNOWN_CERT:
# TODO propose abort / temp trust / perm trust
pass
else:
pass # TODO
data = req.proceed()
if not data:
self.set_status_error(f"Server did not respond in time ({url}).")
return
response = Response.parse(data)
if not response:
self.set_status_error(f"Server response parsing failed ({url}).")
return
if response.code == 20:
handle_code = self.handle_response_content(response)
if handle_code == 0:
if self.current_url and history:
self.history.push(self.current_url)
self.current_url = url
self.cache[url] = self.page_pad.current_page
self.set_status(url)
elif handle_code == 1:
self.set_status(f"Downloaded {url}.")
elif response.generic_code == 30 and response.meta:
self.open_url(response.meta, base_url=url, redirects=redirects + 1)
elif response.generic_code in (40, 50):
error = f"Server error: {response.meta or Response.code.name}"
self.set_status_error(error)
elif response.generic_code == 10:
self.handle_input_request(url, response)
else:
error = f"Unhandled response code {response.code}"
self.set_status_error(error)
def handle_response_content(self, response: Response) -> int:
"""Handle a response's content from a Gemini server.
According to the MIME type received or inferred, render or download the
response's content.
Currently only text/gemini content is rendered.
Arguments:
- response: a successful Response.
Returns:
An error code: 0 means a page has been loaded, so any book-keeping such
as history management can be applied; 1 means a content has been
successfully retrieved but has not been displayed (e.g. non-text
content) nor saved as a page; 2 means that the content could not be
handled, either due to bogus MIME type or MIME parameters.
"""
mime_type = response.get_mime_type()
if mime_type.main_type == "text":
if mime_type.sub_type == "gemini":
encoding = mime_type.charset
try:
text = response.content.decode(encoding, errors="replace")
except LookupError:
self.set_status_error("Unknown encoding {encoding}.")
return 2
self.load_page(Page.from_gemtext(text))
return 0
else:
pass # TODO
else:
pass # TODO
return 1
def load_page(self, page: Page):
"""Load Gemtext data as the current page."""
old_pad_height = self.page_pad.dim[0]
@ -407,17 +311,6 @@ class Browser:
return
self.open_url(links[link_id])
def handle_input_request(self, from_url: str, response: Response):
"""Focus command-line to pass input to the server."""
if response.meta:
self.set_status(f"Input needed: {response.meta}")
else:
self.set_status("Input needed:")
user_input = self.command_line.focus("?")
if user_input:
url = set_parameter(from_url, user_input)
self.open_gemini_url(url)
def handle_mouse(self, mouse_id: int, x: int, y: int, z: int, bstate: int):
"""Handle mouse events.
@ -488,11 +381,7 @@ class Browser:
def reload_page(self):
"""Reload the page, if one has been previously loaded."""
if self.current_url:
self.open_url(
self.current_url,
history=False,
use_cache=False
)
self.open_url(self.current_url, history=False, use_cache=False)
def go_back(self):
"""Go back in history if possible."""
@ -502,37 +391,12 @@ class Browser:
def go_to_parent_page(self):
"""Go to the parent URL if possible."""
if self.current_url:
self.open_gemini_url(get_parent_url(self.current_url))
self.open_url(get_parent_url(self.current_url))
def go_to_root_page(self):
"""Go to the root URL if possible."""
if self.current_url:
self.open_gemini_url(get_root_url(self.current_url))
def open_web_url(self, url):
"""Open a Web URL. Currently relies in Python's webbrowser module."""
self.set_status(f"Opening {url}")
webbrowser.open_new_tab(url)
def open_file(self, filepath, encoding="utf-8", history=True):
"""Open a file and render it.
This should be used only on Gemtext files or at least text files.
Anything else will produce garbage and may crash the program. In the
future this should be able to use a different parser according to a MIME
type or something.
"""
try:
with open(filepath, "rt", encoding=encoding) as f:
text = f.read()
except (OSError, ValueError) as exc:
self.set_status_error(f"Failed to open file: {exc}")
return
self.load_page(Page.from_gemtext(text))
file_url = "file://" + filepath
if history:
self.history.push(file_url)
self.current_url = file_url
self.open_url(get_root_url(self.current_url))
def open_bookmarks(self):
"""Open bookmarks."""

@ -0,0 +1,25 @@
"""Local files browser."""
from bebop.browser.browser import Browser
from bebop.page import Page
def open_file(browser: Browser, filepath: str, encoding="utf-8", history=True):
"""Open a file and render it.
This should be used only on Gemtext files or at least text files.
Anything else will produce garbage and may crash the program. In the
future this should be able to use a different parser according to a MIME
type or something.
"""
try:
with open(filepath, "rt", encoding=encoding) as f:
text = f.read()
except (OSError, ValueError) as exc:
browser.set_status_error(f"Failed to open file: {exc}")
return
browser.load_page(Page.from_gemtext(text))
file_url = "file://" + filepath
if history:
browser.history.push(file_url)
browser.current_url = file_url

@ -0,0 +1,126 @@
"""Gemini-related features of the browser."""
from bebop.browser.browser import Browser
from bebop.navigation import set_parameter
from bebop.page import Page
from bebop.protocol import Request, Response
def open_gemini_url(browser: Browser, url, redirects=0, history=True,
use_cache=True):
"""Open a Gemini URL and set the formatted response as content.
After initiating the connection, TODO
"""
browser.set_status(f"Loading {url}")
if use_cache and url in browser.cache:
browser.load_page(browser.cache[url])
if browser.current_url and history:
browser.history.push(browser.current_url)
browser.current_url = url
browser.set_status(url)
return
req = Request(url, browser.stash)
connected = req.connect()
if not connected:
if req.state == Request.STATE_ERROR_CERT:
error = f"Certificate was missing or corrupt ({url})."
elif req.state == Request.STATE_UNTRUSTED_CERT:
error = f"Certificate has been changed ({url})."
# TODO propose the user ways to handle this.
elif req.state == Request.STATE_CONNECTION_FAILED:
error_details = f": {req.error}" if req.error else "."
error = f"Connection failed ({url})" + error_details
else:
error = f"Connection failed ({url})."
browser.set_status_error(error)
return
if req.state == Request.STATE_INVALID_CERT:
# TODO propose abort / temp trust
pass
elif req.state == Request.STATE_UNKNOWN_CERT:
# TODO propose abort / temp trust / perm trust
pass
else:
pass # TODO
data = req.proceed()
if not data:
browser.set_status_error(f"Server did not respond in time ({url}).")
return
response = Response.parse(data)
if not response:
browser.set_status_error(f"Server response parsing failed ({url}).")
return
if response.code == 20:
handle_code = handle_response_content(browser, response)
if handle_code == 0:
if browser.current_url and history:
browser.history.push(browser.current_url)
browser.current_url = url
browser.cache[url] = browser.page_pad.current_page
browser.set_status(url)
elif handle_code == 1:
browser.set_status(f"Downloaded {url}.")
elif response.generic_code == 30 and response.meta:
browser.open_url(response.meta, base_url=url, redirects=redirects + 1)
elif response.generic_code in (40, 50):
error = f"Server error: {response.meta or Response.code.name}"
browser.set_status_error(error)
elif response.generic_code == 10:
handle_input_request(browser, url, response.meta)
else:
error = f"Unhandled response code {response.code}"
browser.set_status_error(error)
def handle_response_content(browser: Browser, response: Response) -> int:
"""Handle a response's content from a Gemini server.
According to the MIME type received or inferred, render or download the
response's content.
Currently only text/gemini content is rendered.
Arguments:
- response: a successful Response.
Returns:
An error code: 0 means a page has been loaded, so any book-keeping such
as history management can be applied; 1 means a content has been
successfully retrieved but has not been displayed (e.g. non-text
content) nor saved as a page; 2 means that the content could not be
handled, either due to bogus MIME type or MIME parameters.
"""
mime_type = response.get_mime_type()
if mime_type.main_type == "text":
if mime_type.sub_type == "gemini":
encoding = mime_type.charset
try:
text = response.content.decode(encoding, errors="replace")
except LookupError:
browser.set_status_error("Unknown encoding {encoding}.")
return 2
browser.load_page(Page.from_gemtext(text))
return 0
else:
pass # TODO
else:
pass # TODO
return 1
def handle_input_request(browser: Browser, from_url: str, message: str =None):
"""Focus command-line to pass input to the server."""
if message:
browser.set_status(f"Input needed: {message}")
else:
browser.set_status("Input needed:")
user_input = browser.command_line.focus("?")
if user_input:
url = set_parameter(from_url, user_input)
open_gemini_url(browser, url)

@ -0,0 +1,11 @@
"""Ha! You thought there would be a Web browser in there?"""
import webbrowser
from bebop.browser.browser import Browser
def open_web_url(browser: Browser, url):
"""Open a Web URL. Currently relies in Python's webbrowser module."""
browser.set_status(f"Opening {url}")
webbrowser.open_new_tab(url)

@ -98,15 +98,16 @@ def generate_metalines(elements, width):
def format_title(title: Title, context: dict):
"""Return metalines for this title."""
width = context["width"]
if title.level == 1:
wrapped = wrap_words(title.text, context["width"])
line_template = f"{{:^{context['width']}}}"
wrapped = wrap_words(title.text, width)
line_template = f"{{:^{width}}}"
lines = (line_template.format(line) for line in wrapped)
else:
if title.level == 2:
lines = wrap_words(title.text, context["width"], indent=2)
lines = wrap_words(title.text, width, indent=2)
else:
lines = wrap_words(title.text, context["width"])
lines = wrap_words(title.text, width)
# Title levels match the type constants of titles.
return [({"type": LineType(title.level)}, line) for line in lines]

Loading…
Cancel
Save