tofu: proper implementation
This commit is contained in:
parent
80ec71f30b
commit
1fddf4c2b2
|
@ -13,13 +13,12 @@ TODO DONE
|
|||
downloads
|
||||
configuration
|
||||
help page
|
||||
TOFU
|
||||
open last download
|
||||
actual TOFU
|
||||
home page
|
||||
media files
|
||||
view history
|
||||
identity management
|
||||
help page for keybinds
|
||||
--------------------------------------------------------------------------------
|
||||
BACKLOG
|
||||
click on links to open them
|
||||
|
|
10
README.md
10
README.md
|
@ -29,15 +29,13 @@ Why use Bebop instead of something else?
|
|||
|
||||
### Lightweight
|
||||
|
||||
It only uses a single dependency, [asn1crypto][asn1crypto], to delegate
|
||||
parsing certificates. Everything else including NCurses or TLS is done using
|
||||
Python's standard library.
|
||||
|
||||
[asn1crypto]: https://github.com/wbond/asn1crypto
|
||||
It does not use any external dependencies. Everything including NCurses or TLS
|
||||
is done using Python's standard library.
|
||||
|
||||
### Nice keybinds
|
||||
|
||||
A lot of keybinds are defined. Find them in the help page by pressing `?`.
|
||||
A lot of keybinds are defined, and Vim users should get quickly familiar with
|
||||
them. Find them in the help page by pressing `?`.
|
||||
|
||||
### Fun
|
||||
|
||||
|
|
|
@ -2,8 +2,8 @@ import argparse
|
|||
|
||||
from bebop.browser.browser import Browser
|
||||
from bebop.config import load_config
|
||||
from bebop.fs import get_config_path, get_user_data_path
|
||||
from bebop.tofu import load_cert_stash, save_cert_stash
|
||||
from bebop.fs import ensure_bebop_files_exist, get_config_path
|
||||
from bebop.tofu import get_cert_stash_path, load_cert_stash, save_cert_stash
|
||||
|
||||
|
||||
def main():
|
||||
|
@ -19,11 +19,9 @@ def main():
|
|||
config_path = get_config_path()
|
||||
config = load_config(config_path)
|
||||
|
||||
user_data_path = get_user_data_path()
|
||||
if not user_data_path.exists():
|
||||
user_data_path.mkdir()
|
||||
ensure_bebop_files_exist()
|
||||
|
||||
cert_stash_path = user_data_path / "known_hosts.txt"
|
||||
cert_stash_path = get_cert_stash_path()
|
||||
cert_stash = load_cert_stash(cert_stash_path) or {}
|
||||
try:
|
||||
Browser(config, cert_stash).run(start_url=start_url)
|
||||
|
|
|
@ -47,7 +47,7 @@ class Browser:
|
|||
|
||||
def __init__(self, config, cert_stash):
|
||||
self.config = config
|
||||
self.stash = cert_stash or {}
|
||||
self.stash = cert_stash
|
||||
self.screen = None
|
||||
self.dim = (0, 0)
|
||||
self.page_pad = None
|
||||
|
@ -272,6 +272,9 @@ class Browser:
|
|||
return
|
||||
if command in ("o", "open"):
|
||||
self.open_url(words[1], assume_absolute=True)
|
||||
elif command == "forget-certificate":
|
||||
from bebop.browser.gemini import forget_certificate
|
||||
forget_certificate(self, words[1])
|
||||
|
||||
def open_url(self, url, base_url=None, redirects=0, assume_absolute=False,
|
||||
history=True, use_cache=True):
|
||||
|
|
|
@ -8,6 +8,7 @@ from bebop.fs import get_downloads_path
|
|||
from bebop.navigation import set_parameter
|
||||
from bebop.page import Page
|
||||
from bebop.protocol import Request, Response
|
||||
from bebop.tofu import trust_fingerprint, untrust_fingerprint, WRONG_FP_ALERT
|
||||
|
||||
|
||||
MAX_URL_LEN = 1024
|
||||
|
@ -17,10 +18,30 @@ def open_gemini_url(browser: Browser, url, redirects=0, history=True,
|
|||
use_cache=True):
|
||||
"""Open a Gemini URL and set the formatted response as content.
|
||||
|
||||
After initiating the connection, TODO
|
||||
While the specification is not set in stone, every client takes a slightly
|
||||
different approach to enforcing TOFU. Read the `Request.connect` docs to
|
||||
find about cases where connection is aborted without asking the user. What
|
||||
interests us here is what happens when the user should decide herself? This
|
||||
happens in several cases, matching the request possible states. Here is
|
||||
what Bebop do (or want to do):
|
||||
|
||||
- STATE_INVALID_CERT: the certificate has non-fatal issues; we may
|
||||
present the user the problems found and let her decide whether to trust
|
||||
temporarily the certificate or not BUT we currently do not parse the
|
||||
certificate's fields, so this state is never used.
|
||||
- STATE_UNKNOWN_CERT: the certificate is valid but has not been seen before;
|
||||
as we're doing TOFU here, we could automatically trust it or let the user
|
||||
choose. For simplicity, we always trust it permanently.
|
||||
|
||||
Attributes:
|
||||
- browser: Browser object making the request.
|
||||
- url: a valid URL with Gemini scheme to open.
|
||||
- redirects: current amount of redirections done to open the initial URL.
|
||||
- history: if true, save the final URL to history.
|
||||
- use_cache: if true, look up if the page is cached before requesting it.
|
||||
"""
|
||||
if len(url) >= MAX_URL_LEN:
|
||||
browser.set_status_error(f"Request URL too long.")
|
||||
browser.set_status_error("Request URL too long.")
|
||||
return
|
||||
|
||||
browser.set_status(f"Loading {url}")
|
||||
|
@ -40,8 +61,8 @@ def open_gemini_url(browser: Browser, url, redirects=0, history=True,
|
|||
if req.state == Request.STATE_ERROR_CERT:
|
||||
error = f"Certificate was missing or corrupt ({url})."
|
||||
elif req.state == Request.STATE_UNTRUSTED_CERT:
|
||||
_handle_untrusted_cert(browser, req)
|
||||
error = f"Certificate has been changed ({url})."
|
||||
# TODO propose the user ways to handle this.
|
||||
elif req.state == Request.STATE_CONNECTION_FAILED:
|
||||
error_details = ": " + req.error if req.error else "."
|
||||
error = f"Connection failed ({url})" + error_details
|
||||
|
@ -51,13 +72,18 @@ def open_gemini_url(browser: Browser, url, redirects=0, history=True,
|
|||
return
|
||||
|
||||
if req.state == Request.STATE_INVALID_CERT:
|
||||
# TODO propose abort / temp trust
|
||||
pass
|
||||
elif req.state == Request.STATE_UNKNOWN_CERT:
|
||||
# TODO propose abort / temp trust / perm trust
|
||||
pass
|
||||
else:
|
||||
pass # TODO
|
||||
# Certificate is valid but unknown: trust it permanently.
|
||||
hostname = req.hostname
|
||||
fingerprint = req.cert_validation["hash"]
|
||||
trust_fingerprint(
|
||||
browser.stash,
|
||||
hostname,
|
||||
"SHA-512",
|
||||
fingerprint,
|
||||
trust_always=True
|
||||
)
|
||||
|
||||
data = req.proceed()
|
||||
if not data:
|
||||
|
@ -71,6 +97,26 @@ def open_gemini_url(browser: Browser, url, redirects=0, history=True,
|
|||
_handle_response(browser, response, url, redirects, history)
|
||||
|
||||
|
||||
def _handle_untrusted_cert(browser: Browser, request: Request):
|
||||
"""Handle a mismatch between known & server fingerprints.
|
||||
|
||||
This function formats an alert page to explain to the user what the hell is
|
||||
going on and displays it.
|
||||
"""
|
||||
remote_fp = request.cert_validation["hash"]
|
||||
local_fp = request.cert_validation["saved_hash"]
|
||||
alert_page_source = WRONG_FP_ALERT.format(
|
||||
hostname=request.hostname,
|
||||
local_fp=local_fp,
|
||||
remote_fp=remote_fp,
|
||||
)
|
||||
alert_page = Page.from_gemtext(
|
||||
alert_page_source,
|
||||
browser.config["text_width"]
|
||||
)
|
||||
browser.load_page(alert_page)
|
||||
|
||||
|
||||
def _handle_response(browser: Browser, response: Response, url: str,
|
||||
redirects: int, history: bool):
|
||||
"""Handle a response from a Gemini server."""
|
||||
|
@ -167,3 +213,15 @@ def _handle_input_request(browser: Browser, from_url: str, message: str =None):
|
|||
return
|
||||
url = set_parameter(from_url, user_input)
|
||||
open_gemini_url(browser, url)
|
||||
|
||||
|
||||
def forget_certificate(browser: Browser, hostname: str):
|
||||
"""Remove the fingerprint associated to this hostname for the cert stash."""
|
||||
key = browser.prompt(f"Remove fingerprint from {hostname}? [y/N]", "ynN")
|
||||
if key != "y":
|
||||
browser.reset_status()
|
||||
return
|
||||
if untrust_fingerprint(browser.stash, hostname):
|
||||
browser.set_status(f"Known certificate for {hostname} removed.")
|
||||
else:
|
||||
browser.set_status_error(f"Known certificate for {hostname} not found.")
|
||||
|
|
|
@ -208,7 +208,11 @@ class CommandLine:
|
|||
"""Handle input chars and raise a terminate interrupt on a valid key."""
|
||||
# Handle common keys.
|
||||
ch = self._validate_common_input(ch)
|
||||
try:
|
||||
char = chr(ch)
|
||||
except ValueError:
|
||||
pass
|
||||
else:
|
||||
if char in keys:
|
||||
raise TerminateCommandInterrupt(char)
|
||||
return 0
|
||||
|
|
|
@ -45,3 +45,11 @@ def get_downloads_path() -> Path:
|
|||
if download_path:
|
||||
return Path(download_path)
|
||||
return Path.home()
|
||||
|
||||
|
||||
def ensure_bebop_files_exist():
|
||||
"""Ensure various Bebop's files or directories are present."""
|
||||
# Ensure the user data directory exists.
|
||||
user_data_path = get_user_data_path()
|
||||
if not user_data_path.exists():
|
||||
user_data_path.mkdir(parents=True)
|
||||
|
|
|
@ -8,19 +8,13 @@ from enum import IntEnum
|
|||
from typing import Optional
|
||||
|
||||
from bebop.mime import DEFAULT_MIME_TYPE, MimeType
|
||||
from bebop.tofu import CertStatus, CERT_STATUS_INVALID, validate_cert
|
||||
from bebop.tofu import CertStatus, validate_cert
|
||||
|
||||
|
||||
GEMINI_URL_RE = re.compile(r"gemini://(?P<host>[^/]+)(?P<path>.*)")
|
||||
LINE_TERM = b"\r\n"
|
||||
|
||||
|
||||
def parse_gemini_url(url):
|
||||
"""Return a dict containing the hostname and the request path, or None."""
|
||||
match = GEMINI_URL_RE.match(url)
|
||||
return match.groupdict() if match else None
|
||||
|
||||
|
||||
class Request:
|
||||
"""A Gemini request.
|
||||
|
||||
|
@ -30,8 +24,21 @@ class Request:
|
|||
sending the request header and receiving the response:
|
||||
|
||||
1. Instantiate a Request.
|
||||
2. `connect` opens the connection, leaves the caller free to check stuff.
|
||||
2. `connect` opens the connection and aborts it or leaves the caller free to
|
||||
check stuff.
|
||||
3. `proceed` or `abort` can be called.
|
||||
|
||||
Attributes:
|
||||
- url: URL to open.
|
||||
- cert_stash: certificate stash to use an possibly update.
|
||||
- state: request state.
|
||||
- hostname: hostname derived from url, stored when `connect` is called.
|
||||
- payload: bytes object of the payload request; build during `connect`, used
|
||||
during `proceed`.
|
||||
- ssock: TLS-wrapped socket.
|
||||
- cert_validation: validation results dict, set after certificate has been
|
||||
reviewed.
|
||||
- error: human-readable connection error, may be set during `connect`.
|
||||
"""
|
||||
|
||||
# Initial state, connection is not established yet.
|
||||
|
@ -55,28 +62,69 @@ class Request:
|
|||
self.url = url
|
||||
self.cert_stash = cert_stash
|
||||
self.state = Request.STATE_INIT
|
||||
self.hostname = ""
|
||||
self.payload = b""
|
||||
self.ssock = None
|
||||
self.cert = None
|
||||
self.cert_status = None
|
||||
self.cert_validation = None
|
||||
self.error = ""
|
||||
|
||||
def connect(self, timeout):
|
||||
def connect(self, timeout: int) -> bool:
|
||||
"""Connect to a Gemini server and return a RequestEventType.
|
||||
|
||||
Return True if the connection is established. The caller has to verify
|
||||
the request state and propose appropriate choices to the user if the
|
||||
certificate status is not CertStatus.VALID (Request.STATE_OK).
|
||||
|
||||
If connect returns False, the secure socket is aborted before return. If
|
||||
connect returns True, it is up to the caller to decide whether to
|
||||
continue (call proceed) the connection or abort it (call abort).
|
||||
If connect returns False, the secure socket is aborted before return so
|
||||
there is no need to call `abort`. If connect returns True, it is up to the
|
||||
caller to decide whether to continue (call `proceed`) the connection or
|
||||
abort it (call `abort`).
|
||||
|
||||
The request `state` is updated to reflect the connection state after the
|
||||
function returns. The following list describes states related to
|
||||
connection failure (False returned):
|
||||
|
||||
- STATE_INVALID_URL: URL is not valid.
|
||||
- STATE_CONNECTION_FAILED: connection failed, either TCP timeout or
|
||||
local TLS failure. Additionally, the request `error` attribute is set
|
||||
to an error string describing the issue.
|
||||
|
||||
For all request states from now on, the `cert_validation` attribute is
|
||||
updated with the result of the certificate validation.
|
||||
|
||||
The following list describes states related to validation failure (False
|
||||
returned):
|
||||
|
||||
- STATE_ERROR_CERT: server certificate could not be validated at all.
|
||||
- STATE_UNTRUSTED_CERT: server certificate mismatched the known
|
||||
certificate for that hostname. The user should be presented with
|
||||
options to solve the matter.
|
||||
|
||||
For other states, the connection is not aborted (True returned):
|
||||
|
||||
- STATE_INVALID_CERT: the certificate has one or more issues, e.g.
|
||||
mismatching hostname or it is expired.
|
||||
- STATE_UNKNOWN_CERT: the certificate is valid but unknown.
|
||||
- STATE_OK: the certificate is valid and matches the known certificate
|
||||
of that hostname.
|
||||
|
||||
After this function returns, the request state cannot be STATE_INIT.
|
||||
|
||||
Additional notes:
|
||||
|
||||
- The DER hash is compared against the fingerprint for this hostname
|
||||
*and port*; the specification does not tell much about that, but we
|
||||
are slightly more restrictive here by adding the port in the equation.
|
||||
- The state STATE_INVALID_CERT is actually never used in Bebop because
|
||||
of the current tendency to ignore any certificate fields and only
|
||||
check the whole cert fingerprint. Here it is considered the same as a
|
||||
valid certificate.
|
||||
"""
|
||||
url_parts = parse_gemini_url(self.url)
|
||||
url_parts = GEMINI_URL_RE.match(self.url)
|
||||
if not url_parts:
|
||||
self.state = Request.STATE_INVALID_URL
|
||||
return False
|
||||
hostname = url_parts["host"]
|
||||
hostname = url_parts.groupdict()["host"]
|
||||
if ":" in hostname:
|
||||
hostname, port = hostname.split(":", maxsplit=1)
|
||||
try:
|
||||
|
@ -86,6 +134,7 @@ class Request:
|
|||
return False
|
||||
else:
|
||||
port = 1965
|
||||
self.hostname = hostname
|
||||
|
||||
try:
|
||||
self.payload = self.url.encode()
|
||||
|
@ -105,27 +154,26 @@ class Request:
|
|||
try:
|
||||
self.ssock = context.wrap_socket(sock, server_hostname=hostname)
|
||||
except OSError as exc:
|
||||
sock.close()
|
||||
self.state = Request.STATE_CONNECTION_FAILED
|
||||
self.error = exc.strerror
|
||||
return False
|
||||
|
||||
der = self.ssock.getpeercert(binary_form=True)
|
||||
self.cert_status, self.cert = \
|
||||
validate_cert(der, hostname, self.cert_stash)
|
||||
if self.cert_status == CertStatus.ERROR:
|
||||
self.cert_validation = validate_cert(der, hostname, self.cert_stash)
|
||||
cert_status = self.cert_validation["status"]
|
||||
if cert_status == CertStatus.ERROR:
|
||||
self.abort()
|
||||
self.state = Request.STATE_ERROR_CERT
|
||||
return False
|
||||
if self.cert_status == CertStatus.WRONG_FINGERPRINT:
|
||||
if cert_status == CertStatus.WRONG_FINGERPRINT:
|
||||
self.abort()
|
||||
self.state = Request.STATE_UNTRUSTED_CERT
|
||||
return False
|
||||
|
||||
if self.cert_status in CERT_STATUS_INVALID:
|
||||
self.state = Request.STATE_INVALID_CERT
|
||||
elif self.cert_status == CertStatus.VALID_NEW:
|
||||
if cert_status == CertStatus.VALID_NEW:
|
||||
self.state = Request.STATE_UNKNOWN_CERT
|
||||
else: # self.cert_status == CertStatus.VALID
|
||||
else: # self.cert_status in (VALID, VALID_NEW, INVALID_CERT)
|
||||
self.state = Request.STATE_OK
|
||||
return True
|
||||
|
||||
|
@ -232,6 +280,6 @@ class Response:
|
|||
return response
|
||||
|
||||
@staticmethod
|
||||
def get_generic_code(code) -> int:
|
||||
def get_generic_code(code: int) -> int:
|
||||
"""Return the generic version (x0) of this code."""
|
||||
return code - (code % 10)
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
import unittest
|
||||
|
||||
from ..rendering import _explode_words, _find_next_sep, wrap_words
|
||||
from ..metalines import _explode_words, _find_next_sep, wrap_words
|
||||
|
||||
|
||||
class TestRenderer(unittest.TestCase):
|
||||
class TestMetalines(unittest.TestCase):
|
||||
|
||||
def test_wrap_words(self):
|
||||
t = "wrap me wrap me youcantwrapthisonewithoutforce bla bla bla bla"
|
|
@ -1,10 +0,0 @@
|
|||
import unittest
|
||||
|
||||
from ..protocol import parse_gemini_url
|
||||
|
||||
|
||||
class TestGemini(unittest.TestCase):
|
||||
|
||||
def test_parse_url(self):
|
||||
r1 = parse_gemini_url("gemini://dece.space")
|
||||
self.assertDictEqual(r1, {"host": "dece.space", "path": ""})
|
149
bebop/tofu.py
149
bebop/tofu.py
|
@ -4,19 +4,70 @@ As of writing there is still some debate around it, so it is quite messy and
|
|||
requires more clarity both in specification and in our own implementation.
|
||||
"""
|
||||
|
||||
import datetime
|
||||
import hashlib
|
||||
import re
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
import asn1crypto.x509
|
||||
from bebop.fs import get_user_data_path
|
||||
|
||||
|
||||
STASH_LINE_RE = re.compile(r"(\S+) (\S+) (\S+) (\d+)")
|
||||
STASH_LINE_RE = re.compile(r"(\S+) (\S+) (\S+)")
|
||||
|
||||
WRONG_FP_ALERT = """\
|
||||
The request could not complete because the certificate presented by the server \
|
||||
does not match the certificate stored in the local stash.
|
||||
|
||||
``` details of the fingerprint mismatch
|
||||
Hostname: {hostname}
|
||||
Local fingerprint: {local_fp}
|
||||
Server fingerprint: {remote_fp}
|
||||
```
|
||||
|
||||
If you are sure this new certificate can be trusted, press ":" and type the \
|
||||
following command to remove the previous certificate from the local stash, \
|
||||
then retry your request:
|
||||
|
||||
``` command to use to forget about the previous certificate
|
||||
forget-certificate {hostname}
|
||||
```
|
||||
|
||||
You can also manually remove the certificate line from the known hosts file in \
|
||||
your user data directory.
|
||||
|
||||
## FAQ
|
||||
|
||||
### What is this mismatch about?
|
||||
|
||||
Gemini uses TOFU (Trust On First Use) to verify the identity of the server you \
|
||||
are visiting. It means that the first time you visited this capsule, it showed \
|
||||
you its unique ID, but this time the ID is different, so the trust is broken.
|
||||
|
||||
Capsule owners often tell in advance when they are about the use a new \
|
||||
certificate, but they may have forgotten or you may have missed it. Maybe the \
|
||||
old certificate expired and/or has been replaced for another reason (e.g. \
|
||||
using a far away expiration time, borking certificates during a migration, …)
|
||||
|
||||
### Am I being hacked?
|
||||
|
||||
Probably not, but if you are visiting a sensitive capsule, make sure you're \
|
||||
confident enough before trusting this new certificate.
|
||||
|
||||
### How to ensure this new certificate can be trusted?
|
||||
|
||||
Can you join the owner through mail or instant messaging? This is the simplest \
|
||||
way for you to make sure that the server is fine, and maybe alert the owner on \
|
||||
a problem on his server she did not notice.
|
||||
"""
|
||||
|
||||
|
||||
def load_cert_stash(stash_path: Path):
|
||||
def get_cert_stash_path() -> Path:
|
||||
"""Return the default certificate stash path."""
|
||||
return get_user_data_path() / "known_hosts.txt"
|
||||
|
||||
|
||||
def load_cert_stash(stash_path: Path) -> Optional[Dict]:
|
||||
"""Load the certificate stash from the file, or None on error.
|
||||
|
||||
The stash is a dict with host names as keys and tuples as values. Tuples
|
||||
|
@ -36,8 +87,8 @@ def load_cert_stash(stash_path: Path):
|
|||
match = STASH_LINE_RE.match(line)
|
||||
if not match:
|
||||
continue
|
||||
name, algo, fingerprint, timestamp = match.groups()
|
||||
stash[name] = (algo, fingerprint, timestamp, True)
|
||||
name, algo, fingerprint = match.groups()
|
||||
stash[name] = (algo, fingerprint, True)
|
||||
except (OSError, ValueError):
|
||||
return None
|
||||
return stash
|
||||
|
@ -47,71 +98,67 @@ def save_cert_stash(stash: dict, stash_path: Path):
|
|||
"""Save the certificate stash."""
|
||||
try:
|
||||
with open(stash_path, "wt") as stash_file:
|
||||
for name, entry in stash.values():
|
||||
algo, fingerprint, timestamp, is_permanent = entry
|
||||
for name, entry in stash.items():
|
||||
algo, fingerprint, is_permanent = entry
|
||||
if not is_permanent:
|
||||
continue
|
||||
entry_line = f"{name} {algo} {fingerprint} {timestamp}\n"
|
||||
entry_line = f"{name} {algo} {fingerprint}\n"
|
||||
stash_file.write(entry_line)
|
||||
except (OSError, ValueError):
|
||||
pass
|
||||
except (OSError, ValueError) as exc:
|
||||
print(f"Failed to save certificate stash '{stash_path}': {exc}")
|
||||
|
||||
|
||||
class CertStatus(Enum):
|
||||
"""Value returned by validate_cert."""
|
||||
# Cert is valid: proceed.
|
||||
VALID = 0 # Known and valid.
|
||||
VALID_NEW = 7 # New and valid.
|
||||
VALID_NEW = 1 # New and valid.
|
||||
# Cert is unusable or wrong: abort.
|
||||
ERROR = 1 # General error.
|
||||
WRONG_FINGERPRINT = 2 # Fingerprint in the stash is different.
|
||||
# Cert has some issues: ask to proceed.
|
||||
NOT_VALID_YET = 3 # not-before date invalid.
|
||||
EXPIRED = 4 # not-after date invalid.
|
||||
BAD_DOMAIN = 5 # Host name is not in cert's valid domains.
|
||||
ERROR = 2 # General error.
|
||||
WRONG_FINGERPRINT = 3 # Fingerprint in the stash is different.
|
||||
|
||||
|
||||
CERT_STATUS_INVALID = (
|
||||
CertStatus.NOT_VALID_YET,
|
||||
CertStatus.EXPIRED,
|
||||
CertStatus.BAD_DOMAIN,
|
||||
)
|
||||
def validate_cert(der, hostname, cert_stash) -> Dict[str, Any]:
|
||||
"""Return a dict containing validation info for this certificate.
|
||||
|
||||
|
||||
def validate_cert(der, hostname, cert_stash):
|
||||
"""Return a tuple (CertStatus, Certificate) for this certificate."""
|
||||
Returns:
|
||||
The validation dict can contain two keys:
|
||||
- status: CertStatus, always present.
|
||||
- hash: DER hash to be used as certificate fingerprint, present if status is
|
||||
not CertStatus.ERROR.
|
||||
- saved_hash: fingerprint for this hostname in the local stash, present if
|
||||
status is CertStatus.WRONG_FINGERPRINT.
|
||||
"""
|
||||
if der is None:
|
||||
return CertStatus.ERROR, None
|
||||
try:
|
||||
cert = asn1crypto.x509.Certificate.load(der)
|
||||
except ValueError:
|
||||
return CertStatus.ERROR, None
|
||||
return {"status": CertStatus.ERROR}
|
||||
|
||||
# Check for sane parameters.
|
||||
now = datetime.datetime.now(tz=datetime.timezone.utc)
|
||||
if now < cert.not_valid_before:
|
||||
return CertStatus.NOT_VALID_YET, cert
|
||||
if now > cert.not_valid_after:
|
||||
return CertStatus.EXPIRED, cert
|
||||
if hostname not in cert.valid_domains:
|
||||
return CertStatus.BAD_DOMAIN, cert
|
||||
known = False
|
||||
|
||||
# Check the entire certificate fingerprint.
|
||||
cert_hash = hashlib.sha512(der).hexdigest()
|
||||
result = {"hash": cert_hash} # type: Dict[str, Any]
|
||||
if hostname in cert_stash:
|
||||
_, fingerprint, timestamp, _ = cert_stash[hostname]
|
||||
if timestamp >= now.timestamp():
|
||||
_, fingerprint, _ = cert_stash[hostname]
|
||||
if cert_hash != fingerprint:
|
||||
return CertStatus.WRONG_FINGERPRINT, cert
|
||||
else:
|
||||
# Disregard expired fingerprints.
|
||||
pass
|
||||
return CertStatus.VALID, cert
|
||||
result.update(
|
||||
status=CertStatus.WRONG_FINGERPRINT,
|
||||
saved_hash=fingerprint
|
||||
)
|
||||
return result
|
||||
known = True
|
||||
|
||||
# The certificate is unknown and valid.
|
||||
return CertStatus.VALID_NEW, cert
|
||||
result.update(status=CertStatus.VALID if known else CertStatus.VALID_NEW)
|
||||
return result
|
||||
|
||||
|
||||
def trust(cert_stash, hostname, algo, fingerprint, timestamp,
|
||||
trust_always=False):
|
||||
cert_stash[hostname] = (algo, fingerprint, timestamp, trust_always)
|
||||
def trust_fingerprint(stash, hostname, algo, fingerprint, trust_always=False):
|
||||
"""Add a fingerprint entry to this stash."""
|
||||
stash[hostname] = (algo, fingerprint, trust_always)
|
||||
|
||||
|
||||
def untrust_fingerprint(stash, hostname):
|
||||
"""Remove a fingerprint entry from this stash; return True on deletion."""
|
||||
if hostname in stash:
|
||||
del stash[hostname]
|
||||
return True
|
||||
return False
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
asn1crypto
|
Reference in a new issue