This repository has been archived on 2024-08-20. You can view files and clone it, but cannot push or open issues or pull requests.
Bebop/bebop/protocol.py

222 lines
6.9 KiB
Python
Raw Normal View History

2021-03-11 19:16:15 +01:00
"""Gemini protocol implementation."""
2021-02-12 19:01:42 +01:00
import re
import socket
import ssl
from dataclasses import dataclass
from enum import IntEnum
from bebop.tofu import CertStatus, CERT_STATUS_INVALID, validate_cert
GEMINI_URL_RE = re.compile(r"gemini://(?P<host>[^/]+)(?P<path>.*)")
LINE_TERM = b"\r\n"
def parse_gemini_url(url):
"""Return a dict containing the hostname and the request path, or None."""
match = GEMINI_URL_RE.match(url)
return match.groupdict() if match else None
class Request:
2021-03-11 19:16:15 +01:00
"""A Gemini request.
Details about the request itself can be found in the Gemini specification.
This class allows you to do a request in 2 times: first opening the
TLS connection to apply security checks, then aborting or proceeding by
sending the request header and receiving the response:
1. Instantiate a Request.
2. `connect` opens the connection, leaves the caller free to check stuff.
3. `proceed` or `abort` can be called.
"""
2021-02-12 19:01:42 +01:00
# Initial state, connection is not established yet.
STATE_INIT = 0
# An error has occured during cert verification, connection is aborted.
STATE_ERROR_CERT = 1
# An invalid URL has been provided, connection is aborted.
STATE_INVALID_URL = 2
# Invalid cert: user should abort or temporarily trust the cert.
STATE_INVALID_CERT = 3
# Unknown cert: user should abort, temporarily or always trust the cert.
STATE_UNKNOWN_CERT = 4
# Untrusted cert: connection is aborted, manually edit the stash.
STATE_UNTRUSTED_CERT = 5
# Valid and trusted cert: proceed.
STATE_OK = 6
2021-02-15 19:57:49 +01:00
# Connection failed.
STATE_CONNECTION_FAILED = 7
2021-02-12 19:01:42 +01:00
def __init__(self, url, cert_stash):
self.url = url
self.cert_stash = cert_stash
self.state = Request.STATE_INIT
self.payload = b""
self.ssock = None
self.cert = None
self.cert_status = None
2021-02-15 19:57:49 +01:00
self.error = ""
2021-02-12 19:01:42 +01:00
def connect(self):
"""Connect to a Gemini server and return a RequestEventType.
Return True if the connection is established. The caller has to verify
the request state and propose appropriate choices to the user if the
certificate status is not CertStatus.VALID (Request.STATE_OK).
If connect returns False, the secure socket is aborted before return. If
connect returns True, it is up to the caller to decide whether to
continue (call proceed) the connection or abort it (call abort).
"""
url_parts = parse_gemini_url(self.url)
if not url_parts:
self.state = Request.STATE_INVALID_URL
return False
hostname = url_parts["host"]
if ":" in hostname:
hostname, port = hostname.split(":", maxsplit=1)
try:
port = int(port)
except ValueError:
self.state = Request.STATE_INVALID_URL
return False
else:
port = 1965
2021-02-12 19:01:42 +01:00
try:
self.payload = self.url.encode()
except ValueError:
self.state = Request.STATE_INVALID_URL
return False
self.payload += LINE_TERM
2021-02-15 19:57:49 +01:00
try:
sock = socket.create_connection((hostname, port), timeout=10)
2021-02-16 21:23:06 +01:00
except OSError as exc:
2021-02-15 19:57:49 +01:00
self.state = Request.STATE_CONNECTION_FAILED
self.error = exc.strerror
return False
2021-02-12 19:01:42 +01:00
context = Request.get_ssl_context()
2021-02-15 19:57:49 +01:00
try:
self.ssock = context.wrap_socket(sock, server_hostname=hostname)
2021-02-15 19:57:49 +01:00
except OSError as exc:
self.state = Request.STATE_CONNECTION_FAILED
self.error = exc.strerror
return False
2021-02-12 19:01:42 +01:00
der = self.ssock.getpeercert(binary_form=True)
self.cert_status, self.cert = \
validate_cert(der, hostname, self.cert_stash)
if self.cert_status == CertStatus.ERROR:
self.abort()
self.state = Request.STATE_ERROR_CERT
return False
if self.cert_status == CertStatus.WRONG_FINGERPRINT:
self.abort()
self.state = Request.STATE_UNTRUSTED_CERT
return False
if self.cert_status in CERT_STATUS_INVALID:
self.state = Request.STATE_INVALID_CERT
elif self.cert_status == CertStatus.VALID_NEW:
self.state = Request.STATE_UNKNOWN_CERT
else: # self.cert_status == CertStatus.VALID
self.state = Request.STATE_OK
return True
def abort(self):
"""Close the connection."""
self.ssock.close()
def proceed(self):
"""Complete the request: send the payload and return received data."""
self.ssock.sendall(self.payload)
response = b""
while True:
try:
buf = self.ssock.recv(4096)
except socket.timeout:
buf = None
2021-02-12 19:01:42 +01:00
if not buf:
return response
response += buf
@staticmethod
def get_ssl_context():
"""Return a secure SSL context that is adequate for Gemini."""
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.options |= ssl.OP_NO_TLSv1
context.options |= ssl.OP_NO_TLSv1_1
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
return context
class StatusCode(IntEnum):
UNKNOWN = 0
INPUT = 10
SENSITIVE_INPUT = 11
SUCCESS = 20
REDIRECT = 30
PERMANENT_REDIRECT = 31
TEMP_FAILURE = 40
SERVER_UNAVAILABLE = 41
CGI_ERROR = 42
PROXY_ERROR = 43
SLOW_DOWN = 44
PERM_FAILURE = 50
NOT_FOUND = 51
GONE = 52
PROXY_REQUEST_REFUSED = 53
BAD_REQUEST = 59
CERT_REQUIRED = 60
CERT_NOT_AUTHORISED = 61
CERT_NOT_VALID = 62
_missing_ = lambda _: StatusCode.UNKNOWN
@dataclass
class Response:
"""A Gemini response."""
code: StatusCode
meta: str = ""
content: bytes = b""
HEADER_RE = re.compile(r"(\d{2}) (.*)")
MAX_META_LEN = 1024
2021-02-12 19:01:42 +01:00
@property
def generic_code(self):
return Response.get_generic_code(self.code)
2021-02-12 19:01:42 +01:00
@staticmethod
def parse(data):
"""Parse a received response."""
try:
response_header_len = data.index(LINE_TERM)
response_header = data[:response_header_len].decode()
except ValueError:
return None
match = Response.HEADER_RE.match(response_header)
if not match:
return None
code, meta = match.groups()
if len(meta) > Response.MAX_META_LEN:
return None
response = Response(StatusCode(int(code)), meta=meta)
if response.generic_code == StatusCode.SUCCESS:
2021-02-12 19:01:42 +01:00
content_offset = response_header_len + len(LINE_TERM)
response.content = data[content_offset:]
elif response.code == StatusCode.UNKNOWN:
return None
return response
@staticmethod
def get_generic_code(code):
"""Return the generic version (x0) of this code."""
return code - (code % 10)