Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Copyright (C) 2019-2020 Leo P. Singer <leo.singer@ligo.org>
3#
4# This program is free software: you can redistribute it and/or modify
5# it under the terms of the GNU General Public License as published by
6# the Free Software Foundation, either version 3 of the License, or
7# (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU General Public License for more details.
13#
14# You should have received a copy of the GNU General Public License
15# along with this program. If not, see <https://www.gnu.org/licenses/>.
16#
17"""HTTPS adapter to close connections with expired client certificates."""
18from __future__ import absolute_import
19from datetime import datetime, timedelta
20from functools import partial
22from cryptography.hazmat.backends import default_backend
23from cryptography.x509 import load_pem_x509_certificate
24from requests.packages.urllib3.connection import HTTPSConnection
25from requests.packages.urllib3.connectionpool import (HTTPConnectionPool,
26 HTTPSConnectionPool)
27from requests.adapters import HTTPAdapter
29_backend = default_backend()
32def load_x509_certificate(filename):
33 """Load an X.509 certificate from a file.
35 Parameters
36 ----------
37 filename : str
38 The name of the certificate file.
40 Returns
41 -------
42 cert : cryptography.x509.Certificate
43 The parsed certificate.
45 """
46 with open(filename, 'rb') as f:
47 data = f.read()
48 return load_pem_x509_certificate(data, _backend)
51class _CertReloadingHTTPSConnection(HTTPSConnection):
53 def __init__(self, host, cert_reload_timeout=0, **kwargs):
54 super(_CertReloadingHTTPSConnection, self).__init__(host, **kwargs)
55 self._not_valid_after = datetime.max
56 self._reload_timeout = timedelta(seconds=cert_reload_timeout)
58 @property
59 def cert_has_expired(self):
60 expires = self._not_valid_after - datetime.utcnow()
61 return expires <= self._reload_timeout
63 def connect(self):
64 if self.cert_file:
65 cert = load_x509_certificate(self.cert_file)
66 self._not_valid_after = cert.not_valid_after
67 super(_CertReloadingHTTPSConnection, self).connect()
70class _CertReloadingHTTPSConnectionPool(HTTPSConnectionPool):
72 ConnectionCls = _CertReloadingHTTPSConnection
74 def __init__(self, host, port=None, cert_reload_timeout=0, **kwargs):
75 super(_CertReloadingHTTPSConnectionPool, self).__init__(
76 host, port=port, **kwargs)
77 self.conn_kw['cert_reload_timeout'] = cert_reload_timeout
79 def _get_conn(self, timeout=None):
80 while True:
81 conn = super(_CertReloadingHTTPSConnectionPool, self)._get_conn(
82 timeout)
83 # Note: this loop is guaranteed to terminate because, even if the
84 # pool is completely drained, when we create a new connection, its
85 # `_not_valid_after` property is set to `datetime.max`, and the
86 # condition below will evaulate to `True`.
87 if not conn.cert_has_expired:
88 return conn
89 conn.close()
92class CertReloadingHTTPAdapter(HTTPAdapter):
93 """A mixin for :class:`requests.Session` to automatically reload the client
94 X.509 certificates if the version that is stored in the session is going to
95 expire soon.
97 Parameters
98 ----------
99 cert_reload_timeout : int
100 Reload the certificate if it expires within this many seconds from now.
102 """
104 def __init__(self, cert_reload_timeout=0, **kwargs):
105 super(CertReloadingHTTPAdapter, self).__init__(**kwargs)
106 https_pool_cls = partial(
107 _CertReloadingHTTPSConnectionPool,
108 cert_reload_timeout=cert_reload_timeout)
109 self.poolmanager.pool_classes_by_scheme = {
110 'http': HTTPConnectionPool,
111 'https': https_pool_cls}