Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Copyright (C) 2019-2020 Leo P. Singer <leo.singer@ligo.org>
3#
4# This program is free software: you can redistribute it and/or modify
5# it under the terms of the GNU General Public License as published by
6# the Free Software Foundation, either version 3 of the License, or
7# (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU General Public License for more details.
13#
14# You should have received a copy of the GNU General Public License
15# along with this program. If not, see <https://www.gnu.org/licenses/>.
16#
17from os import access, environ, getuid, R_OK
18from os.path import expanduser, join
19from six.moves.urllib.parse import urlparse
21from safe_netrc import netrc
23from .cert_reload import CertReloadingHTTPAdapter
26def _find_cert():
27 """Try to find a user's X509 certificate and key.
29 Checks environment variables first, then expected location for default
30 proxy.
32 Notes
33 -----
34 This function is adapted from the original ``_find_x509_credentials()``
35 method in https://git.ligo.org/lscsoft/gracedb-client/blob/gracedb-2.5.0/ligo/gracedb/
36 rest.py, which is copyright (C) Brian Moe, Branson Stephens (2015).
38 """ # noqa: E501
39 result = tuple(environ.get(key)
40 for key in ('X509_USER_CERT', 'X509_USER_KEY'))
41 if all(result):
42 return result
44 result = environ.get('X509_USER_PROXY')
45 if result:
46 return result
48 result = join('/tmp', 'x509up_u{}'.format(getuid()))
49 if access(result, R_OK):
50 return result
52 result = tuple(expanduser(join('~', '.globus', filename))
53 for filename in ('usercert.pem', 'userkey.pem'))
54 if all(access(path, R_OK) for path in result):
55 return result
58def _find_username_password(url):
59 host = urlparse(url).hostname
61 try:
62 result = netrc().authenticators(host)
63 except IOError:
64 result = None
66 if result is not None:
67 username, _, password = result
68 result = (username, password)
70 return result
73class SessionAuthMixin(object):
74 """A mixin for :class:`requests.Session` to add support for all GraceDB
75 authentication mechanisms.
77 Parameters
78 ----------
79 url : str
80 GraceDB Client URL.
81 cert : str, tuple
82 Client-side X.509 certificate. May be either a single filename
83 if the certificate and private key are concatenated together, or
84 a tuple of the filenames for the certificate and private key.
85 username : str
86 Username for basic auth.
87 password : str
88 Password for basic auth.
89 force_noauth : bool, default=False
90 If true, then do not use any authentication at all.
91 fail_if_noauth : bool, default=False
92 If true, then raise an exception if authentication credentials are
93 not provided.
94 cert_reload : bool, default=False
95 If true, then automatically reload the client certificate before it
96 expires.
97 cert_reload_timeout : int, default=300
98 Reload the certificate this many seconds before it expires.
100 Notes
101 -----
102 When a new Session instance is created, the following sources of
103 authentication are tried, in order:
105 1. If the :obj:`force_noauth` keyword argument is true, then perform no
106 authentication at all.
108 2. If the :obj:`cert` keyword argument is provided, then use X.509 client
109 certificate authentication.
111 3. If the :obj:`username` and :obj:`password` keyword arguments are
112 provided, then use basic auth.
114 4. Look for a default X.509 client certificate in:
116 a. the environment variables :envvar:`X509_USER_CERT` and
117 :envvar:`X509_USER_KEY`
118 b. the environment variable :envvar:`X509_USER_PROXY`
119 c. the file :file:`/tmp/x509up_u{UID}`, where :samp:`{UID}` is your
120 numeric user ID, if the file exists and is readable
121 d. the files :file:`~/.globus/usercert.pem` and
122 :file:`~/.globus/userkey.pem`, if they exist and are readable
124 5. Read the netrc file [1]_ located at :file:`~/.netrc`, or at the path
125 stored in the environment variable :envvar:`NETRC`, and look for a
126 username and password matching the hostname in the URL.
128 6. If the :obj:`fail_if_noauth` keyword argument is true, and no
129 authentication source was found, then raise a :class:`ValueError`.
131 References
132 ----------
133 .. [1] The .netrc file.
134 https://www.gnu.org/software/inetutils/manual/html_node/The-_002enetrc-file.html
136 """ # noqa: E501
138 def __init__(self, url=None, cert=None, username=None, password=None,
139 force_noauth=False, fail_if_noauth=False, cert_reload=False,
140 cert_reload_timeout=300, **kwargs):
141 super(SessionAuthMixin, self).__init__(**kwargs)
143 # Support for reloading client certificates
144 if cert_reload:
145 self.mount('https://', CertReloadingHTTPAdapter(
146 cert_reload_timeout=cert_reload_timeout))
148 # Argument validation
149 if fail_if_noauth and force_noauth:
150 raise ValueError(
151 'Must not set both force_noauth and fail_if_noauth.')
152 if (username is None) ^ (password is None):
153 raise ValueError('Must provide username and password, or neither.')
155 # FIXME: these should go into elif clauses below
156 # (as in `elif default_cert := _find_x509_credentials():`)
157 # in order to defer unnecessary I/O, but this requires
158 # the := operator, which requires Python 3.8.
159 default_cert = _find_cert()
160 default_basic_auth = _find_username_password(url)
162 if force_noauth:
163 pass
164 elif cert is not None:
165 self.cert = cert
166 elif username is not None:
167 self.auth = (username, password)
168 elif default_cert is not None:
169 self.cert = default_cert
170 elif default_basic_auth is not None:
171 self.auth = default_basic_auth
172 elif fail_if_noauth:
173 raise ValueError('No authentication credentials found.')