Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1"""Source Parameter Estimation with LALInference and Bilby.""" 

2from distutils.spawn import find_executable 

3from distutils.dir_util import mkpath 

4import glob 

5import itertools 

6import json 

7import os 

8import shutil 

9import subprocess 

10import tempfile 

11import urllib 

12 

13from celery import group 

14from gwdatafind import find_urls 

15import numpy as np 

16from requests.exceptions import HTTPError 

17 

18from .. import app 

19from ..jinja import env 

20from .core import ordered_group 

21from . import condor 

22from . import gracedb 

23 

24 

25ini_name = 'online_lalinference_pe.ini' 

26 

27executables = {'datafind': 'gw_data_find', 

28 'mergeNSscript': 'lalinference_nest2pos', 

29 'mergeMCMCscript': 'cbcBayesMCMC2pos', 

30 'combinePTMCMCh5script': 'cbcBayesCombinePTMCMCh5s', 

31 'resultspage': 'cbcBayesPostProc', 

32 'segfind': 'ligolw_segment_query', 

33 'ligolw_print': 'ligolw_print', 

34 'coherencetest': 'lalinference_coherence_test', 

35 'lalinferencenest': 'lalinference_nest', 

36 'lalinferencemcmc': 'lalinference_mcmc', 

37 'lalinferencebambi': 'lalinference_bambi', 

38 'lalinferencedatadump': 'lalinference_datadump', 

39 'ligo-skymap-from-samples': 'true', 

40 'ligo-skymap-plot': 'true', 

41 'processareas': 'process_areas', 

42 'computeroqweights': 'lalinference_compute_roq_weights', 

43 'mpiwrapper': 'lalinference_mpi_wrapper', 

44 'gracedb': 'gracedb', 

45 'ppanalysis': 'cbcBayesPPAnalysis', 

46 'pos_to_sim_inspiral': 'cbcBayesPosToSimInspiral', 

47 'bayeswave': 'BayesWave', 

48 'bayeswavepost': 'BayesWavePost'} 

49 

50 

51def _data_exists(end, frametype_dict): 

52 """Check whether data at end time can be found with gwdatafind and return 

53 true it it is found. 

54 """ 

55 return min( 

56 len( 

57 find_urls(ifo[0], frametype_dict[ifo], end, end + 1) 

58 ) for ifo in frametype_dict.keys() 

59 ) > 0 

60 

61 

62class NotEnoughData(Exception): 

63 """Raised if found data is not enough due to the latency of data 

64 transfer 

65 """ 

66 

67 

68@app.task(bind=True, autoretry_for=(NotEnoughData, ), default_retry_delay=1, 

69 max_retries=86400, retry_backoff=True, shared=False) 

70def query_data(self, trigtime): 

71 """Continues to query data until it is found with gwdatafind and return 

72 frametypes for the data. If data is not found in 86400 seconds = 1 day, 

73 raise NotEnoughData. 

74 """ 

75 end = trigtime + 2 

76 if _data_exists(end, app.conf['low_latency_frame_types']): 

77 return app.conf['low_latency_frame_types'] 

78 elif _data_exists(end, app.conf['high_latency_frame_types']): 

79 return app.conf['high_latency_frame_types'] 

80 else: 

81 raise NotEnoughData 

82 

83 

84@app.task(ignore_result=True, shared=False) 

85def upload_no_frame_files(request, exc, traceback, superevent_id): 

86 """Upload notification when no frame files are found. 

87 

88 Parameters 

89 ---------- 

90 request : Context (placeholder) 

91 Task request variables 

92 exc : Exception 

93 Exception rased by condor.submit 

94 traceback : str (placeholder) 

95 Traceback message from a task 

96 superevent_id : str 

97 The GraceDB ID of a target superevent 

98 

99 """ 

100 if isinstance(exc, NotEnoughData): 

101 gracedb.upload.delay( 

102 filecontents=None, filename=None, 

103 graceid=superevent_id, 

104 message='Frame files have not been found.', 

105 tags='pe' 

106 ) 

107 

108 

109def _find_appropriate_cal_env(trigtime, dir_name): 

110 """Return the path to the calibration uncertainties estimated at the time 

111 before and closest to the trigger time. If there are no calibration 

112 uncertainties estimated before the trigger time, return the oldest one. The 

113 gpstimes at which the calibration uncertainties were estimated and the 

114 names of the files containing the uncertaintes are saved in 

115 [HLV]_CalEnvs.txt. 

116 

117 Parameters 

118 ---------- 

119 trigtime : float 

120 The trigger time of a target event 

121 dir_name : str 

122 The path to the directory where files containing calibration 

123 uncertainties exist 

124 

125 Return 

126 ------ 

127 path : str 

128 The path to the calibration uncertainties appropriate for a target 

129 event 

130 

131 """ 

132 filename, = glob.glob(os.path.join(dir_name, '[HLV]_CalEnvs.txt')) 

133 calibration_index = np.atleast_1d( 

134 np.recfromtxt(filename, names=['gpstime', 'filename']) 

135 ) 

136 gpstimes = calibration_index['gpstime'] 

137 candidate_gpstimes = gpstimes < trigtime 

138 if np.any(candidate_gpstimes): 

139 idx = np.argmax(gpstimes * candidate_gpstimes) 

140 appropriate_cal = calibration_index['filename'][idx] 

141 else: 

142 appropriate_cal = calibration_index['filename'][np.argmin(gpstimes)] 

143 return os.path.join(dir_name, appropriate_cal.decode('utf-8')) 

144 

145 

146@app.task(shared=False) 

147def prepare_ini(frametype_dict, event, superevent_id=None): 

148 """Determine an appropriate PE settings for the target event and return ini 

149 file content for LALInference pipeline 

150 """ 

151 # Get template of .ini file 

152 ini_template = env.get_template('online_pe.jinja2') 

153 

154 # fill out the ini template and return the resultant content 

155 singleinspiraltable = event['extra_attributes']['SingleInspiral'] 

156 trigtime = event['gpstime'] 

157 ini_settings = { 

158 'gracedb_host': app.conf['gracedb_host'], 

159 'types': frametype_dict, 

160 'channels': app.conf['strain_channel_names'], 

161 'state_vector_channels': app.conf['state_vector_channel_names'], 

162 'webdir': os.path.join( 

163 app.conf['pe_results_path'], event['graceid'], 'lalinference' 

164 ), 

165 'paths': [{'name': name, 'path': find_executable(executable)} 

166 for name, executable in executables.items()], 

167 'h1_calibration': _find_appropriate_cal_env( 

168 trigtime, 

169 '/home/cbc/pe/O3/calibrationenvelopes/LIGO_Hanford' 

170 ), 

171 'l1_calibration': _find_appropriate_cal_env( 

172 trigtime, 

173 '/home/cbc/pe/O3/calibrationenvelopes/LIGO_Livingston' 

174 ), 

175 'v1_calibration': _find_appropriate_cal_env( 

176 trigtime, 

177 '/home/cbc/pe/O3/calibrationenvelopes/Virgo' 

178 ), 

179 'mc': min([sngl['mchirp'] for sngl in singleinspiraltable]), 

180 'q': min([sngl['mass2'] / sngl['mass1'] 

181 for sngl in singleinspiraltable]), 

182 'mpirun': find_executable('mpirun') 

183 } 

184 ini_rota = ini_template.render(ini_settings) 

185 ini_settings.update({'use_of_ini': 'online'}) 

186 ini_online = ini_template.render(ini_settings) 

187 # upload LALInference ini file to GraceDB 

188 if superevent_id is not None: 

189 gracedb.upload.delay( 

190 ini_rota, filename=ini_name, graceid=superevent_id, 

191 message=('Automatically generated LALInference configuration file' 

192 ' for this event.'), 

193 tags='pe') 

194 

195 return ini_online 

196 

197 

198def pre_pe_tasks(event, superevent_id): 

199 """Return canvas of tasks executed before parameter estimation starts""" 

200 return query_data.s(event['gpstime']).on_error( 

201 upload_no_frame_files.s(superevent_id) 

202 ) | prepare_ini.s(event, superevent_id) 

203 

204 

205@app.task(shared=False) 

206def _setup_dag_for_lalinference(coinc_psd, ini_contents, 

207 rundir, superevent_id): 

208 """Create DAG for a lalinference run and return the path to DAG. 

209 

210 Parameters 

211 ---------- 

212 coinc_psd : tuple of byte contents 

213 Tuple of the byte contents of ``coinc.xml`` and ``psd.xml.gz`` 

214 ini_contents : str 

215 The content of online_lalinference_pe.ini 

216 rundir : str 

217 The path to a run directory where the DAG file exits 

218 superevent_id : str 

219 The GraceDB ID of a target superevent 

220 

221 Returns 

222 ------- 

223 path_to_dag : str 

224 The path to the .dag file 

225 

226 """ 

227 coinc_contents, psd_contents = coinc_psd 

228 

229 # write down coinc.xml in the run directory 

230 path_to_coinc = os.path.join(rundir, 'coinc.xml') 

231 with open(path_to_coinc, 'wb') as f: 

232 f.write(coinc_contents) 

233 

234 # write down psd.xml.gz 

235 if psd_contents is not None: 

236 path_to_psd = os.path.join(rundir, 'psd.xml.gz') 

237 with open(path_to_psd, 'wb') as f: 

238 f.write(psd_contents) 

239 psd_arg = ['--psd', path_to_psd] 

240 else: 

241 psd_arg = [] 

242 

243 # write down .ini file in the run directory. 

244 path_to_ini = os.path.join(rundir, ini_name) 

245 with open(path_to_ini, 'w') as f: 

246 f.write(ini_contents) 

247 

248 try: 

249 subprocess.run( 

250 ['lalinference_pipe', '--run-path', rundir, 

251 '--coinc', path_to_coinc, path_to_ini] + psd_arg, 

252 capture_output=True, check=True) 

253 except subprocess.CalledProcessError as e: 

254 contents = b'args:\n' + json.dumps(e.args[1]).encode('utf-8') + \ 

255 b'\n\nstdout:\n' + e.stdout + b'\n\nstderr:\n' + e.stderr 

256 gracedb.upload.delay( 

257 filecontents=contents, filename='pe_dag.log', 

258 graceid=superevent_id, 

259 message='Failed to prepare DAG for lalinference', tags='pe' 

260 ) 

261 shutil.rmtree(rundir) 

262 raise 

263 else: 

264 # Remove the ini file so that people do not accidentally use this ini 

265 # file and hence online-PE-only nodes. 

266 os.remove(path_to_ini) 

267 

268 return os.path.join(rundir, 'multidag.dag') 

269 

270 

271@app.task(shared=False) 

272def _setup_dag_for_bilby(event, rundir, preferred_event_id, superevent_id): 

273 """Create DAG for a bilby run and return the path to DAG. 

274 

275 Parameters 

276 ---------- 

277 event : json contents 

278 The json contents retrieved from gracedb.get_event() 

279 rundir : str 

280 The path to a run directory where the DAG file exits 

281 preferred_event_id : str 

282 The GraceDB ID of a target preferred event 

283 superevent_id : str 

284 The GraceDB ID of a target superevent 

285 

286 Returns 

287 ------- 

288 path_to_dag : str 

289 The path to the .dag file 

290 

291 """ 

292 path_to_json = os.path.join(rundir, 'event.json') 

293 with open(path_to_json, 'w') as f: 

294 json.dump(event, f, indent=2) 

295 

296 path_to_webdir = os.path.join( 

297 app.conf['pe_results_path'], preferred_event_id, 'bilby' 

298 ) 

299 

300 setup_arg = ['bilby_pipe_gracedb', '--webdir', path_to_webdir, 

301 '--outdir', rundir, '--json', path_to_json, '--online-pe', 

302 '--convert-to-flat-in-component-mass'] 

303 

304 if not app.conf['gracedb_host'] == 'gracedb.ligo.org': 

305 setup_arg += ['--channel-dict', 'o2replay', 

306 '--sampler-kwargs', 'FastTest'] 

307 try: 

308 subprocess.run(setup_arg, capture_output=True, check=True) 

309 except subprocess.CalledProcessError as e: 

310 contents = b'args:\n' + json.dumps(e.args[1]).encode('utf-8') + \ 

311 b'\n\nstdout:\n' + e.stdout + b'\n\nstderr:\n' + e.stderr 

312 gracedb.upload.delay( 

313 filecontents=contents, filename='pe_dag.log', 

314 graceid=superevent_id, 

315 message='Failed to prepare DAG for bilby', tags='pe' 

316 ) 

317 shutil.rmtree(rundir) 

318 raise 

319 else: 

320 # Uploads bilby ini file to GraceDB 

321 group(upload_results_tasks( 

322 rundir, 'bilby_config.ini', superevent_id, 

323 'Automatically generated Bilby configuration file', 

324 'pe', 'online_bilby_pe.ini')).delay() 

325 

326 path_to_dag, = glob.glob(os.path.join(rundir, 'submit/dag*.submit')) 

327 print(path_to_dag) 

328 return path_to_dag 

329 

330 

331@app.task(shared=False) 

332def _condor_no_submit(path_to_dag): 

333 """Run 'condor_submit_dag -no_submit' and return the path to .sub file.""" 

334 subprocess.run(['condor_submit_dag', '-no_submit', path_to_dag], 

335 capture_output=True, check=True) 

336 return '{}.condor.sub'.format(path_to_dag) 

337 

338 

339@app.task(shared=False) 

340def dag_prepare_task(rundir, superevent_id, preferred_event_id, pe_pipeline, 

341 ini_contents=None): 

342 """Return a canvas of tasks to prepare DAG. 

343 

344 Parameters 

345 ---------- 

346 rundir : str 

347 The path to a run directory where the DAG file exits 

348 superevent_id : str 

349 The GraceDB ID of a target superevent 

350 preferred_event_id : str 

351 The GraceDB ID of a target preferred event 

352 pe_pipeline : str 

353 The parameter estimation pipeline used 

354 Either 'lalinference' OR 'bilby' 

355 ini_contents : str 

356 The content of online_lalinference_pe.ini 

357 Required if pe_pipeline == 'lalinference' 

358 

359 Returns 

360 ------- 

361 canvas : canvas of tasks 

362 The canvas of tasks to prepare DAG 

363 

364 """ 

365 if pe_pipeline == 'lalinference': 

366 canvas = ordered_group( 

367 gracedb.download.si('coinc.xml', preferred_event_id), 

368 _download_psd.si(preferred_event_id) 

369 ) | _setup_dag_for_lalinference.s(ini_contents, rundir, superevent_id) 

370 elif pe_pipeline == 'bilby': 

371 canvas = gracedb.get_event.si(preferred_event_id) | \ 

372 _setup_dag_for_bilby.s(rundir, preferred_event_id, superevent_id) 

373 else: 

374 raise NotImplementedError(f'Unknown PE pipeline {pe_pipeline}.') 

375 canvas |= _condor_no_submit.s() 

376 return canvas 

377 

378 

379def _find_paths_from_name(directory, name): 

380 """Return the paths of files or directories with given name under the 

381 specfied directory 

382 

383 Parameters 

384 ---------- 

385 directory : string 

386 Name of directory under which the target file or directory is searched 

387 for. 

388 name : string 

389 Name of target files or directories 

390 

391 Returns 

392 ------- 

393 paths : generator 

394 Paths to the target files or directories 

395 

396 """ 

397 return glob.iglob(os.path.join(directory, '**', name), recursive=True) 

398 

399 

400@app.task(ignore_result=True, shared=False) 

401def job_error_notification(request, exc, traceback, 

402 superevent_id, rundir, pe_pipeline): 

403 """Upload notification when condor.submit terminates unexpectedly. 

404 

405 Parameters 

406 ---------- 

407 request : Context (placeholder) 

408 Task request variables 

409 exc : Exception 

410 Exception rased by condor.submit 

411 traceback : str (placeholder) 

412 Traceback message from a task 

413 superevent_id : str 

414 The GraceDB ID of a target superevent 

415 rundir : str 

416 The run directory for PE 

417 pe_pipeline : str 

418 The parameter estimation pipeline used 

419 Either lalinference OR bilby 

420 

421 """ 

422 if isinstance(exc, condor.JobAborted): 

423 gracedb.upload.delay( 

424 filecontents=None, filename=None, graceid=superevent_id, tags='pe', 

425 message='The {} condor job was aborted.'.format(pe_pipeline) 

426 ) 

427 elif isinstance(exc, condor.JobFailed): 

428 gracedb.upload.delay( 

429 filecontents=None, filename=None, graceid=superevent_id, tags='pe', 

430 message='The {} condor job failed.'.format(pe_pipeline) 

431 ) 

432 # Get paths to .log files, .err files, .out files 

433 paths_to_log = _find_paths_from_name(rundir, '*.log') 

434 paths_to_err = _find_paths_from_name(rundir, '*.err') 

435 paths_to_out = _find_paths_from_name(rundir, '*.out') 

436 # Upload .log and .err files 

437 for path in itertools.chain(paths_to_log, paths_to_err, paths_to_out): 

438 with open(path, 'rb') as f: 

439 contents = f.read() 

440 if contents: 

441 # put .log suffix in log file names so that users can directly 

442 # read the contents instead of downloading them when they click 

443 # file names 

444 gracedb.upload.delay( 

445 filecontents=contents, 

446 filename=os.path.basename(path) + '.log', 

447 graceid=superevent_id, 

448 message='A log file for {} condor job.'.format(pe_pipeline), 

449 tags='pe' 

450 ) 

451 

452 

453@app.task(ignore_result=True, shared=False) 

454def _upload_url(pe_results_path, graceid, pe_pipeline): 

455 """Upload url of a page containing all of the plots.""" 

456 if pe_pipeline == 'lalinference': 

457 path_to_posplots, = _find_paths_from_name( 

458 pe_results_path, 'posplots.html' 

459 ) 

460 elif pe_pipeline == 'bilby': 

461 path_to_posplots, = _find_paths_from_name( 

462 pe_results_path, 'home.html' 

463 ) 

464 else: 

465 raise NotImplementedError(f'Unknown PE pipeline {pe_pipeline}.') 

466 

467 baseurl = urllib.parse.urljoin( 

468 app.conf['pe_results_url'], 

469 os.path.relpath( 

470 path_to_posplots, 

471 app.conf['pe_results_path'] 

472 ) 

473 ) 

474 gracedb.upload.delay( 

475 filecontents=None, filename=None, graceid=graceid, 

476 message=('Online {} parameter estimation finished.' 

477 '<a href={}>results</a>').format(pe_pipeline, baseurl), 

478 tags='pe' 

479 ) 

480 

481 

482def upload_results_tasks(pe_results_path, filename, graceid, message, tag, 

483 uploaded_filename=None): 

484 """Return tasks to get the contents of PE result files and upload them to 

485 GraceDB. 

486 

487 Parameters 

488 ---------- 

489 pe_results_path : string 

490 Directory under which the target file located. 

491 filename : string 

492 Name of the target file 

493 graceid : string 

494 GraceDB ID 

495 message : string 

496 Message uploaded to GraceDB 

497 tag : str 

498 Name of tag to add the GraceDB log 

499 uploaded_filename : str 

500 Name of the uploaded file. If not supplied, it is the same as the 

501 original file name. 

502 

503 Returns 

504 ------- 

505 tasks : list of celery tasks 

506 

507 """ 

508 tasks = [] 

509 for path in _find_paths_from_name(pe_results_path, filename): 

510 if uploaded_filename is None: 

511 _uploaded_filename = os.path.basename(path) 

512 else: 

513 _uploaded_filename = uploaded_filename 

514 with open(path, 'rb') as f: 

515 tasks.append(gracedb.upload.si(f.read(), _uploaded_filename, 

516 graceid, message, tag)) 

517 return tasks 

518 

519 

520@app.task(ignore_result=True, shared=False) 

521def clean_up(rundir): 

522 """Clean up a run directory. 

523 

524 Parameters 

525 ---------- 

526 rundir : str 

527 The path to a run directory where the DAG file exits 

528 

529 """ 

530 shutil.rmtree(rundir) 

531 

532 

533@app.task(ignore_result=True, shared=False) 

534def dag_finished(rundir, preferred_event_id, superevent_id, pe_pipeline): 

535 """Upload PE results and clean up run directory 

536 

537 Parameters 

538 ---------- 

539 rundir : str 

540 The path to a run directory where the DAG file exits 

541 preferred_event_id : str 

542 The GraceDB ID of a target preferred event 

543 superevent_id : str 

544 The GraceDB ID of a target superevent 

545 pe_pipeline : str 

546 The parameter estimation pipeline used 

547 Either lalinference OR bilby 

548 

549 Returns 

550 ------- 

551 tasks : canvas 

552 The work-flow for uploading PE results 

553 

554 """ 

555 pe_results_path = os.path.join( 

556 app.conf['pe_results_path'], preferred_event_id, pe_pipeline 

557 ) 

558 

559 if pe_pipeline == 'lalinference': 

560 uploads = [ 

561 (rundir, 'glitch_median_PSD_forLI_*.dat', 

562 'Bayeswave PSD used for LALInference PE', None), 

563 (rundir, 'lalinference*.dag', 'LALInference DAG', None), 

564 (rundir, 'posterior*.hdf5', 

565 'LALInference posterior samples', 

566 'LALInference.posterior_samples.hdf5'), 

567 (pe_results_path, 'extrinsic.png', 

568 'LALInference corner plot for extrinsic parameters', 

569 'LALInference.extrinsic.png'), 

570 (pe_results_path, 'sourceFrame.png', 

571 'LALInference corner plot for source frame parameters', 

572 'LALInference.intrinsic.png') 

573 ] 

574 elif pe_pipeline == 'bilby': 

575 resultdir = os.path.join(rundir, 'result') 

576 uploads = [ 

577 (resultdir, '*merge_result.json', 

578 'Bilby posterior samples', 

579 'Bilby.posterior_samples.json'), 

580 (resultdir, '*_extrinsic_corner.png', 

581 'Bilby corner plot for extrinsic parameters', 

582 'Bilby.extrinsic.png'), 

583 (resultdir, '*_intrinsic_corner.png', 

584 'Bilby corner plot for intrinsic parameters', 

585 'Bilby.intrinsic.png') 

586 ] 

587 else: 

588 raise NotImplementedError(f'Unknown PE pipeline {pe_pipeline}.') 

589 

590 upload_tasks = [] 

591 for dir, name1, comment, name2 in uploads: 

592 upload_tasks += upload_results_tasks( 

593 dir, name1, superevent_id, comment, 'pe', name2) 

594 

595 # FIXME: _upload_url.si has to be out of group for 

596 # gracedb.create_label.si to run 

597 ( 

598 _upload_url.si(pe_results_path, superevent_id, pe_pipeline) 

599 | 

600 group(upload_tasks) 

601 | 

602 clean_up.si(rundir) 

603 ).delay() 

604 

605 if pe_pipeline == 'lalinference': 

606 gracedb.create_label.delay('PE_READY', superevent_id) 

607 

608 

609@gracedb.task(shared=False) 

610def _download_psd(gid): 

611 """Download ``psd.xml.gz`` and return its content. If that file does not 

612 exist, return None. 

613 """ 

614 try: 

615 return gracedb.download("psd.xml.gz", gid) 

616 except HTTPError: 

617 return None 

618 

619 

620@app.task(ignore_result=True, shared=False) 

621def start_pe(ini_contents, preferred_event_id, superevent_id, pe_pipeline): 

622 """Run Parameter Estimation on a given event. 

623 

624 Parameters 

625 ---------- 

626 ini_contents : str 

627 The content of online_lalinference_pe.ini 

628 preferred_event_id : str 

629 The GraceDB ID of a target preferred event 

630 superevent_id : str 

631 The GraceDB ID of a target superevent 

632 pe_pipeline : str 

633 The parameter estimation pipeline used 

634 lalinference OR bilby 

635 

636 """ 

637 gracedb.upload.delay( 

638 filecontents=None, filename=None, graceid=superevent_id, 

639 message=('Starting {} online parameter estimation ' 

640 'for {}').format(pe_pipeline, preferred_event_id), 

641 tags='pe' 

642 ) 

643 

644 # make a run directory 

645 pipeline_dir = os.path.expanduser('~/.cache/{}'.format(pe_pipeline)) 

646 mkpath(pipeline_dir) 

647 rundir = tempfile.mkdtemp( 

648 dir=pipeline_dir, prefix='{}_'.format(superevent_id) 

649 ) 

650 

651 # give permissions to read the files under the run directory so that PE 

652 # ROTA people can check the status of parameter estimation. 

653 os.chmod(rundir, 0o755) 

654 

655 canvas = ( 

656 dag_prepare_task( 

657 rundir, superevent_id, preferred_event_id, 

658 pe_pipeline, ini_contents 

659 ) 

660 | 

661 condor.submit.s().on_error( 

662 job_error_notification.s(superevent_id, rundir, pe_pipeline) 

663 ) 

664 | 

665 dag_finished.si( 

666 rundir, preferred_event_id, superevent_id, pe_pipeline 

667 ) 

668 ) 

669 canvas.delay()