Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1"""Submit and monitor HTCondor jobs [1]_. 

2 

3Notes 

4----- 

5Internally, we use the XML condor log format [2]_ for easier parsing. 

6 

7References 

8---------- 

9.. [1] http://research.cs.wisc.edu/htcondor/manual/latest/condor_submit.html 

10.. [2] http://research.cs.wisc.edu/htcondor/classad/refman/node3.html 

11 

12""" 

13from distutils.dir_util import mkpath 

14import os 

15import subprocess 

16import tempfile 

17 

18import lxml.etree 

19 

20from .. import app 

21 

22 

23def _escape_arg(arg): 

24 """Escape a command line argument for an HTCondor submit file.""" 

25 arg = arg.replace('"', '""').replace("'", "''") 

26 if ' ' in arg or '\t' in arg: 

27 arg = "'" + arg + "'" 

28 return arg 

29 

30 

31def _escape_args(args): 

32 """Escape a list of command line arguments for an HTCondor submit file.""" 

33 return '"' + ' '.join(_escape_arg(arg) for arg in args) + '"' 

34 

35 

36def _mklog(suffix): 

37 """Create a unique path for an HTCondor log.""" 

38 condor_dir = os.path.expanduser('~/.cache/condor') 

39 mkpath(condor_dir) 

40 with tempfile.NamedTemporaryFile(dir=condor_dir, suffix=suffix) as f: 

41 return f.name 

42 

43 

44def _read(filename): 

45 with open(filename, 'r') as f: 

46 return f.read() 

47 

48 

49def _rm_f(*args): 

50 for arg in args: 

51 try: 

52 os.remove(arg) 

53 except OSError: 

54 pass 

55 

56 

57def _parse_classad(c): 

58 """Turn a ClassAd XML fragment into a dictionary of Python values. 

59 

60 Note that this supports only the small subset of the ClassAd XML 

61 syntax [2]_ that we need to determine if a job succeeded or failed. 

62 """ 

63 if c is not None: 

64 for a in c.findall('a'): 

65 key = a.attrib['n'] 

66 child, = a.getchildren() 

67 if child.tag == 's': 

68 value = str(child.text) 

69 elif child.tag == 'b': 

70 value = (child.attrib['v'] == 't') 

71 elif child.tag == 'i': 

72 value = int(child.text) 

73 else: 

74 # Coverage skipped below because the Python compiler optimzies 

75 # away ``continue`` statements. 

76 # 

77 # See <https://bitbucket.org/ned/coveragepy/issues/198>. 

78 continue # pragma: no cover 

79 yield key, value 

80 

81 

82def _read_last_event(log): 

83 """Get the last event from an HTCondor log file. 

84 

85 FIXME: It would be more efficient in terms of I/O and file desciptors to 

86 use a single HTCondor log file for all jobs and use the inotify 

87 capabilities of ``htcondor.read_events`` to avoid unnecessary polling. 

88 """ 

89 tree = lxml.etree.fromstring('<classads>' + _read(log) + '</classads>') 

90 return dict(_parse_classad(tree.find('c[last()]'))) 

91 

92 

93def _submit(submit_file=None, **kwargs): 

94 args = ['condor_submit'] 

95 for key, value in kwargs.items(): 

96 args += ['-append', '{}={}'.format(key, value)] 

97 if submit_file is None: 

98 args += ['/dev/null', '-queue', '1'] 

99 else: 

100 args += [submit_file] 

101 subprocess.run(args, capture_output=True, check=True) 

102 

103 

104class JobAborted(Exception): 

105 """Raised if an HTCondor job was aborted (e.g. by ``condor_rm``).""" 

106 

107 

108class JobRunning(Exception): 

109 """Raised if an HTCondor job is still running.""" 

110 

111 

112class JobFailed(subprocess.CalledProcessError): 

113 """Raised if an HTCondor job fails.""" 

114 

115 

116@app.task(bind=True, autoretry_for=(JobRunning,), default_retry_delay=1, 

117 ignore_result=True, max_retries=None, retry_backoff=True, 

118 shared=False) 

119def submit(self, submit_file, log=None): 

120 """Submit a job using HTCondor. 

121 

122 Parameters 

123 ---------- 

124 submit_file : str 

125 Path of the submit file. 

126 log: str 

127 Used internally to track job state. Caller should not set. 

128 

129 Raises 

130 ------ 

131 :class:`JobAborted` 

132 If the job was aborted (e.g. by running ``condor_rm``). 

133 :class:`JobFailed` 

134 If the job terminates and returns a nonzero exit code. 

135 :class:`JobRunning` 

136 If the job is still running. Causes the task to be re-queued until the 

137 job is complete. 

138 

139 Example 

140 ------- 

141 >>> submit.s('example.sub', 

142 ... accounting_group='ligo.dev.o3.cbc.explore.test') 

143 

144 """ 

145 if log is None: 

146 log = _mklog('.log') 

147 try: 

148 _submit(submit_file, log_xml='true', log=log) 

149 except subprocess.CalledProcessError: 

150 _rm_f(log) 

151 raise 

152 self.retry((submit_file,), dict(log=log)) 

153 else: 

154 event = _read_last_event(log) 

155 if event.get('MyType') == 'JobTerminatedEvent': 

156 _rm_f(log) 

157 if event['TerminatedNormally'] and event['ReturnValue'] != 0: 

158 raise JobFailed(event['ReturnValue'], (submit_file,)) 

159 elif event.get('MyType') == 'JobAbortedEvent': 

160 _rm_f(log) 

161 raise JobAborted(event) 

162 else: 

163 raise JobRunning(event) 

164 

165 

166@app.task(bind=True, autoretry_for=(JobRunning,), default_retry_delay=1, 

167 max_retries=None, retry_backoff=True, shared=False) 

168def check_output(self, args, log=None, error=None, output=None, **kwargs): 

169 """Call a process using HTCondor. 

170 

171 Call an external process using HTCondor, in a manner patterned after 

172 :meth:`subprocess.check_output`. If successful, returns its output on 

173 stdout. On failure, raise an exception. 

174 

175 Parameters 

176 ---------- 

177 args : list 

178 Command line arguments, as if passed to :func:`subprocess.check_call`. 

179 log, error, output : str 

180 Used internally to track job state. Caller should not set. 

181 **kwargs 

182 Extra submit description file commands. See the documentation for 

183 ``condor_submit`` for possible values. 

184 

185 Returns 

186 ------- 

187 str 

188 Captured output from command. 

189 

190 Raises 

191 ------ 

192 :class:`JobAborted` 

193 If the job was aborted (e.g. by running ``condor_rm``). 

194 :class:`JobFailed` 

195 If the job terminates and returns a nonzero exit code. 

196 :class:`JobRunning` 

197 If the job is still running. Causes the task to be re-queued until the 

198 job is complete. 

199 

200 Example 

201 ------- 

202 >>> check_output.s(['sleep', '10'], 

203 ... accounting_group='ligo.dev.o3.cbc.explore.test') 

204 

205 """ 

206 # FIXME: Refactor to reuse common code from this task and 

207 # gwcelery.tasks.condor.submit. 

208 

209 if log is None: 

210 log = _mklog('.log') 

211 error = _mklog('.err') 

212 output = _mklog('.out') 

213 kwargs = dict(kwargs, 

214 universe='vanilla', 

215 executable='/usr/bin/env', 

216 getenv='true', 

217 log_xml='true', 

218 arguments=_escape_args(args), 

219 log=log, error=error, output=output) 

220 try: 

221 _submit(**kwargs) 

222 except subprocess.CalledProcessError: 

223 _rm_f(log, error, output) 

224 raise 

225 self.retry((args,), kwargs) 

226 else: 

227 event = _read_last_event(log) 

228 if event.get('MyType') == 'JobTerminatedEvent': 

229 captured_error = _read(error) 

230 captured_output = _read(output) 

231 _rm_f(log, error, output) 

232 if event['TerminatedNormally'] and event['ReturnValue'] == 0: 

233 return captured_output 

234 else: 

235 raise JobFailed(event['ReturnValue'], args, 

236 captured_output, 

237 captured_error) 

238 elif event.get('MyType') == 'JobAbortedEvent': 

239 _rm_f(log, error, output) 

240 raise JobAborted(event) 

241 else: 

242 raise JobRunning(event)