Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# 

2# Copyright (C) 2019-2020 Leo P. Singer <leo.singer@ligo.org> 

3# 

4# This program is free software: you can redistribute it and/or modify 

5# it under the terms of the GNU General Public License as published by 

6# the Free Software Foundation, either version 3 of the License, or 

7# (at your option) any later version. 

8# 

9# This program is distributed in the hope that it will be useful, 

10# but WITHOUT ANY WARRANTY; without even the implied warranty of 

11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

12# GNU General Public License for more details. 

13# 

14# You should have received a copy of the GNU General Public License 

15# along with this program. If not, see <https://www.gnu.org/licenses/>. 

16# 

17from os import access, environ, getuid, R_OK 

18from os.path import expanduser, join 

19from six.moves.urllib.parse import urlparse 

20 

21from safe_netrc import netrc 

22 

23from .cert_reload import CertReloadingHTTPAdapter 

24 

25 

26def _find_cert(): 

27 """Try to find a user's X509 certificate and key. 

28 

29 Checks environment variables first, then expected location for default 

30 proxy. 

31 

32 Notes 

33 ----- 

34 This function is adapted from the original ``_find_x509_credentials()`` 

35 method in https://git.ligo.org/lscsoft/gracedb-client/blob/gracedb-2.5.0/ligo/gracedb/ 

36 rest.py, which is copyright (C) Brian Moe, Branson Stephens (2015). 

37 

38 """ # noqa: E501 

39 result = tuple(environ.get(key) 

40 for key in ('X509_USER_CERT', 'X509_USER_KEY')) 

41 if all(result): 

42 return result 

43 

44 result = environ.get('X509_USER_PROXY') 

45 if result: 

46 return result 

47 

48 result = join('/tmp', 'x509up_u{}'.format(getuid())) 

49 if access(result, R_OK): 

50 return result 

51 

52 result = tuple(expanduser(join('~', '.globus', filename)) 

53 for filename in ('usercert.pem', 'userkey.pem')) 

54 if all(access(path, R_OK) for path in result): 

55 return result 

56 

57 

58def _find_username_password(url): 

59 host = urlparse(url).hostname 

60 

61 try: 

62 result = netrc().authenticators(host) 

63 except IOError: 

64 result = None 

65 

66 if result is not None: 

67 username, _, password = result 

68 result = (username, password) 

69 

70 return result 

71 

72 

73class SessionAuthMixin(object): 

74 """A mixin for :class:`requests.Session` to add support for all GraceDB 

75 authentication mechanisms. 

76 

77 Parameters 

78 ---------- 

79 url : str 

80 GraceDB Client URL. 

81 cert : str, tuple 

82 Client-side X.509 certificate. May be either a single filename 

83 if the certificate and private key are concatenated together, or 

84 a tuple of the filenames for the certificate and private key. 

85 username : str 

86 Username for basic auth. 

87 password : str 

88 Password for basic auth. 

89 force_noauth : bool, default=False 

90 If true, then do not use any authentication at all. 

91 fail_if_noauth : bool, default=False 

92 If true, then raise an exception if authentication credentials are 

93 not provided. 

94 cert_reload : bool, default=False 

95 If true, then automatically reload the client certificate before it 

96 expires. 

97 cert_reload_timeout : int, default=300 

98 Reload the certificate this many seconds before it expires. 

99 

100 Notes 

101 ----- 

102 When a new Session instance is created, the following sources of 

103 authentication are tried, in order: 

104 

105 1. If the :obj:`force_noauth` keyword argument is true, then perform no 

106 authentication at all. 

107 

108 2. If the :obj:`cert` keyword argument is provided, then use X.509 client 

109 certificate authentication. 

110 

111 3. If the :obj:`username` and :obj:`password` keyword arguments are 

112 provided, then use basic auth. 

113 

114 4. Look for a default X.509 client certificate in: 

115 

116 a. the environment variables :envvar:`X509_USER_CERT` and 

117 :envvar:`X509_USER_KEY` 

118 b. the environment variable :envvar:`X509_USER_PROXY` 

119 c. the file :file:`/tmp/x509up_u{UID}`, where :samp:`{UID}` is your 

120 numeric user ID, if the file exists and is readable 

121 d. the files :file:`~/.globus/usercert.pem` and 

122 :file:`~/.globus/userkey.pem`, if they exist and are readable 

123 

124 5. Read the netrc file [1]_ located at :file:`~/.netrc`, or at the path 

125 stored in the environment variable :envvar:`NETRC`, and look for a 

126 username and password matching the hostname in the URL. 

127 

128 6. If the :obj:`fail_if_noauth` keyword argument is true, and no 

129 authentication source was found, then raise a :class:`ValueError`. 

130 

131 References 

132 ---------- 

133 .. [1] The .netrc file. 

134 https://www.gnu.org/software/inetutils/manual/html_node/The-_002enetrc-file.html 

135 

136 """ # noqa: E501 

137 

138 def __init__(self, url=None, cert=None, username=None, password=None, 

139 force_noauth=False, fail_if_noauth=False, cert_reload=False, 

140 cert_reload_timeout=300, **kwargs): 

141 super(SessionAuthMixin, self).__init__(**kwargs) 

142 

143 # Support for reloading client certificates 

144 if cert_reload: 

145 self.mount('https://', CertReloadingHTTPAdapter( 

146 cert_reload_timeout=cert_reload_timeout)) 

147 

148 # Argument validation 

149 if fail_if_noauth and force_noauth: 

150 raise ValueError( 

151 'Must not set both force_noauth and fail_if_noauth.') 

152 if (username is None) ^ (password is None): 

153 raise ValueError('Must provide username and password, or neither.') 

154 

155 # FIXME: these should go into elif clauses below 

156 # (as in `elif default_cert := _find_x509_credentials():`) 

157 # in order to defer unnecessary I/O, but this requires 

158 # the := operator, which requires Python 3.8. 

159 default_cert = _find_cert() 

160 default_basic_auth = _find_username_password(url) 

161 

162 if force_noauth: 

163 pass 

164 elif cert is not None: 

165 self.cert = cert 

166 elif username is not None: 

167 self.auth = (username, password) 

168 elif default_cert is not None: 

169 self.cert = default_cert 

170 elif default_basic_auth is not None: 

171 self.auth = default_basic_auth 

172 elif fail_if_noauth: 

173 raise ValueError('No authentication credentials found.')