Coverage for gwcelery/tasks/condor.py: 82%

98 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-25 18:01 +0000

1"""Submit and monitor HTCondor jobs [1]_. 

2 

3Notes 

4----- 

5Internally, we use the XML condor log format [2]_ for easier parsing. 

6 

7References 

8---------- 

9.. [1] http://research.cs.wisc.edu/htcondor/manual/latest/condor_submit.html 

10.. [2] http://research.cs.wisc.edu/htcondor/classad/refman/node3.html 

11 

12""" 

13import os 

14import subprocess 

15import tempfile 

16from distutils.dir_util import mkpath 

17 

18import lxml.etree 

19 

20from .. import app 

21 

22 

23def _escape_arg(arg): 

24 """Escape a command line argument for an HTCondor submit file.""" 

25 arg = arg.replace('"', '""').replace("'", "''") 

26 if ' ' in arg or '\t' in arg: 

27 arg = "'" + arg + "'" 

28 return arg 

29 

30 

31def _escape_args(args): 

32 """Escape a list of command line arguments for an HTCondor submit file.""" 

33 return '"' + ' '.join(_escape_arg(arg) for arg in args) + '"' 

34 

35 

36def _mklog(suffix): 

37 """Create a unique path for an HTCondor log.""" 

38 condor_dir = os.path.expanduser('~/.cache/condor') 

39 mkpath(condor_dir) 

40 with tempfile.NamedTemporaryFile(dir=condor_dir, suffix=suffix) as f: 

41 return f.name 

42 

43 

44def _read(filename): 

45 with open(filename, 'r') as f: 

46 return f.read() 

47 

48 

49def _rm_f(*args): 

50 for arg in args: 

51 try: 

52 os.remove(arg) 

53 except OSError: 

54 pass 

55 

56 

57def _parse_classad(c): 

58 """Turn a ClassAd XML fragment into a dictionary of Python values. 

59 

60 Note that this supports only the small subset of the ClassAd XML 

61 syntax [2]_ that we need to determine if a job succeeded or failed. 

62 """ 

63 if c is not None: 

64 for a in c.findall('a'): 

65 key = a.attrib['n'] 

66 child, = a.getchildren() 

67 if child.tag == 's': 

68 value = str(child.text) 

69 elif child.tag == 'b': 

70 value = (child.attrib['v'] == 't') 

71 elif child.tag == 'i': 

72 value = int(child.text) 

73 else: 

74 # Coverage skipped below because the Python compiler optimzies 

75 # away ``continue`` statements. 

76 # 

77 # See <https://bitbucket.org/ned/coveragepy/issues/198>. 

78 continue # pragma: no cover 

79 yield key, value 

80 

81 

82def _read_last_event(log): 

83 """Get the last event from an HTCondor log file. 

84 

85 FIXME: It would be more efficient in terms of I/O and file desciptors to 

86 use a single HTCondor log file for all jobs and use the inotify 

87 capabilities of ``htcondor.read_events`` to avoid unnecessary polling. 

88 """ 

89 tree = lxml.etree.fromstring('<classads>' + _read(log) + '</classads>') 

90 return dict(_parse_classad(tree.find('c[last()]'))) 

91 

92 

93def _submit(submit_file=None, **kwargs): 

94 args = ['condor_submit'] 

95 for key, value in kwargs.items(): 

96 args += ['-append', '{}={}'.format(key, value)] 

97 if submit_file is None: 

98 args += ['/dev/null', '-queue', '1'] 

99 else: 

100 args += [submit_file] 

101 subprocess.run(args, capture_output=True, check=True) 

102 

103 

104class JobAborted(Exception): 

105 """Raised if an HTCondor job was aborted (e.g. by ``condor_rm``).""" 

106 

107 

108class JobRunning(Exception): 

109 """Raised if an HTCondor job is still running.""" 

110 

111 

112class JobFailed(subprocess.CalledProcessError): 

113 """Raised if an HTCondor job fails.""" 

114 

115 

116submit_kwargs = dict( 

117 bind=True, autoretry_for=(JobRunning,), 

118 ignore_result=True, shared=False, 

119) 

120 

121 

122@app.task( 

123 **submit_kwargs, 

124 **app.conf['condor_retry_kwargs'] 

125) 

126def submit(self, submit_file, log=None): 

127 """Submit a job using HTCondor. 

128 

129 Parameters 

130 ---------- 

131 submit_file : str 

132 Path of the submit file. 

133 log: str 

134 Used internally to track job state. Caller should not set. 

135 

136 Raises 

137 ------ 

138 :class:`JobAborted` 

139 If the job was aborted (e.g. by running ``condor_rm``). 

140 :class:`JobFailed` 

141 If the job terminates and returns a nonzero exit code. 

142 :class:`JobRunning` 

143 If the job is still running. Causes the task to be re-queued until the 

144 job is complete. 

145 

146 Example 

147 ------- 

148 >>> submit.s('example.sub', 

149 ... accounting_group='ligo.dev.o3.cbc.explore.test') 

150 """ 

151 if log is None: 

152 log = _mklog('.log') 

153 try: 

154 _submit(submit_file, log_xml='true', log=log) 

155 except subprocess.CalledProcessError: 

156 _rm_f(log) 

157 raise 

158 self.retry((submit_file,), dict(log=log)) 

159 else: 

160 event = _read_last_event(log) 

161 if event.get('MyType') == 'JobTerminatedEvent': 

162 _rm_f(log) 

163 if event['TerminatedNormally'] and event['ReturnValue'] != 0: 

164 raise JobFailed(event['ReturnValue'], (submit_file,)) 

165 elif event.get('MyType') == 'JobAbortedEvent': 

166 _rm_f(log) 

167 raise JobAborted(event) 

168 else: 

169 raise JobRunning(event) 

170 

171 

172@app.task(bind=True, autoretry_for=(JobRunning,), default_retry_delay=1, 

173 max_retries=None, retry_backoff=True, shared=False) 

174def check_output(self, args, log=None, error=None, output=None, **kwargs): 

175 """Call a process using HTCondor. 

176 

177 Call an external process using HTCondor, in a manner patterned after 

178 :meth:`subprocess.check_output`. If successful, returns its output on 

179 stdout. On failure, raise an exception. 

180 

181 Parameters 

182 ---------- 

183 args : list 

184 Command line arguments, as if passed to :func:`subprocess.check_call`. 

185 log, error, output : str 

186 Used internally to track job state. Caller should not set. 

187 **kwargs 

188 Extra submit description file commands. See the documentation for 

189 ``condor_submit`` for possible values. 

190 

191 Returns 

192 ------- 

193 str 

194 Captured output from command. 

195 

196 Raises 

197 ------ 

198 :class:`JobAborted` 

199 If the job was aborted (e.g. by running ``condor_rm``). 

200 :class:`JobFailed` 

201 If the job terminates and returns a nonzero exit code. 

202 :class:`JobRunning` 

203 If the job is still running. Causes the task to be re-queued until the 

204 job is complete. 

205 

206 Example 

207 ------- 

208 >>> check_output.s(['sleep', '10'], 

209 ... accounting_group='ligo.dev.o3.cbc.explore.test') 

210 

211 """ 

212 # FIXME: Refactor to reuse common code from this task and 

213 # gwcelery.tasks.condor.submit. 

214 

215 if log is None: 

216 log = _mklog('.log') 

217 error = _mklog('.err') 

218 output = _mklog('.out') 

219 kwargs = dict(kwargs, 

220 universe='vanilla', 

221 executable='/usr/bin/env', 

222 getenv='true', 

223 log_xml='true', 

224 arguments=_escape_args(args), 

225 log=log, error=error, output=output) 

226 try: 

227 _submit(**kwargs) 

228 except subprocess.CalledProcessError: 

229 _rm_f(log, error, output) 

230 raise 

231 self.retry((args,), kwargs) 

232 else: 

233 event = _read_last_event(log) 

234 if event.get('MyType') == 'JobTerminatedEvent': 

235 captured_error = _read(error) 

236 captured_output = _read(output) 

237 _rm_f(log, error, output) 

238 if event['TerminatedNormally'] and event['ReturnValue'] == 0: 

239 return captured_output 

240 else: 

241 raise JobFailed(event['ReturnValue'], args, 

242 captured_output, 

243 captured_error) 

244 elif event.get('MyType') == 'JobAbortedEvent': 

245 _rm_f(log, error, output) 

246 raise JobAborted(event) 

247 else: 

248 raise JobRunning(event)