Coverage for gwcelery/tasks/inference.py: 99%

327 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-17 17:22 +0000

1"""Source Parameter Estimation with LALInference, Bilby, and RapidPE.""" 

2import glob 

3import json 

4import os 

5import subprocess 

6import urllib 

7from distutils.dir_util import mkpath 

8from distutils.spawn import find_executable 

9 

10import numpy as np 

11from bilby_pipe.bilbyargparser import BilbyConfigFileParser 

12from bilby_pipe.utils import convert_string_to_dict 

13from celery import group 

14from celery.exceptions import Ignore 

15 

16from .. import app 

17from ..jinja import env 

18from . import condor, gracedb 

19 

20_RAPIDPE_NO_GSTLAL_TRIGGER_EXIT_CODE = 100 

21 

22RAPIDPE_GETENV = [ 

23 "DEFAULT_SEGMENT_SERVER", "GWDATAFIND_SERVER", 

24 "LAL_DATA_PATH", "LD_LIBRARY_PATH", "LIBRARY_PATH", 

25 "NDSSERVER", "PATH", "PYTHONPATH", 

26] 

27"""Names of environment variables to include in RapidPE HTCondor submit 

28files.""" 

29 

30RAPIDPE_ENVIRONMENT = { 

31 "RIFT_LOWLATENCY": "True", 

32} 

33"""Names and values of environment variables to include in RapidPE HTCondor 

34submit files.""" 

35 

36 

37def _find_appropriate_cal_env(trigtime, dir_name): 

38 """Return the path to the calibration uncertainties estimated at the time 

39 before and closest to the trigger time. If there are no calibration 

40 uncertainties estimated before the trigger time, return the oldest one. The 

41 gpstimes at which the calibration uncertainties were estimated and the 

42 names of the files containing the uncertaintes are saved in 

43 [HLV]_CalEnvs.txt. 

44 

45 Parameters 

46 ---------- 

47 trigtime : float 

48 The trigger time of a target event 

49 dir_name : str 

50 The path to the directory where files containing calibration 

51 uncertainties exist 

52 

53 Return 

54 ------ 

55 path : str 

56 The path to the calibration uncertainties appropriate for a target 

57 event 

58 

59 """ 

60 filename, = glob.glob(os.path.join(dir_name, '[HLV]_CalEnvs.txt')) 

61 calibration_index = np.atleast_1d( 

62 np.recfromtxt(filename, names=['gpstime', 'filename']) 

63 ) 

64 gpstimes = calibration_index['gpstime'] 

65 candidate_gpstimes = gpstimes < trigtime 

66 if np.any(candidate_gpstimes): 

67 idx = np.argmax(gpstimes * candidate_gpstimes) 

68 appropriate_cal = calibration_index['filename'][idx] 

69 else: 

70 appropriate_cal = calibration_index['filename'][np.argmin(gpstimes)] 

71 return os.path.join(dir_name, appropriate_cal.decode('utf-8')) 

72 

73 

74def prepare_lalinference_ini(event, superevent_id): 

75 """Determine LALInference configurations and return ini file content 

76 

77 Parameters 

78 ---------- 

79 event : dict 

80 The json contents of a target G event retrieved from 

81 gracedb.get_event(), whose mass and spin information are used to 

82 determine analysis settings. 

83 superevent_id : str 

84 The GraceDB ID of a target superevent 

85 

86 Returns 

87 ------- 

88 ini_contents : str 

89 

90 """ 

91 # Get template of .ini file 

92 ini_template = env.get_template('lalinference.jinja2') 

93 

94 # fill out the ini template and return the resultant content 

95 singleinspiraltable = event['extra_attributes']['SingleInspiral'] 

96 trigtime = event['gpstime'] 

97 executables = {'datafind': 'gw_data_find', 

98 'mergeNSscript': 'lalinference_nest2pos', 

99 'mergeMCMCscript': 'cbcBayesMCMC2pos', 

100 'combinePTMCMCh5script': 'cbcBayesCombinePTMCMCh5s', 

101 'resultspage': 'cbcBayesPostProc', 

102 'segfind': 'ligolw_segment_query', 

103 'ligolw_print': 'ligolw_print', 

104 'coherencetest': 'lalinference_coherence_test', 

105 'lalinferencenest': 'lalinference_nest', 

106 'lalinferencemcmc': 'lalinference_mcmc', 

107 'lalinferencebambi': 'lalinference_bambi', 

108 'lalinferencedatadump': 'lalinference_datadump', 

109 'ligo-skymap-from-samples': 'true', 

110 'ligo-skymap-plot': 'true', 

111 'processareas': 'process_areas', 

112 'computeroqweights': 'lalinference_compute_roq_weights', 

113 'mpiwrapper': 'lalinference_mpi_wrapper', 

114 'gracedb': 'gracedb', 

115 'ppanalysis': 'cbcBayesPPAnalysis', 

116 'pos_to_sim_inspiral': 'cbcBayesPosToSimInspiral', 

117 'bayeswave': 'BayesWave', 

118 'bayeswavepost': 'BayesWavePost'} 

119 ini_settings = { 

120 'gracedb_host': app.conf['gracedb_host'], 

121 'types': app.conf['low_latency_frame_types'], 

122 'channels': app.conf['strain_channel_names'], 

123 'state_vector_channels': app.conf['state_vector_channel_names'], 

124 'webdir': os.path.join( 

125 app.conf['pe_results_path'], superevent_id, 'lalinference' 

126 ), 

127 'paths': [{'name': name, 'path': find_executable(executable)} 

128 for name, executable in executables.items()], 

129 'h1_calibration': _find_appropriate_cal_env( 

130 trigtime, 

131 '/home/cbc/pe/O3/calibrationenvelopes/LIGO_Hanford' 

132 ), 

133 'l1_calibration': _find_appropriate_cal_env( 

134 trigtime, 

135 '/home/cbc/pe/O3/calibrationenvelopes/LIGO_Livingston' 

136 ), 

137 'v1_calibration': _find_appropriate_cal_env( 

138 trigtime, 

139 '/home/cbc/pe/O3/calibrationenvelopes/Virgo' 

140 ), 

141 'mc': min([sngl['mchirp'] for sngl in singleinspiraltable]), 

142 'q': min([sngl['mass2'] / sngl['mass1'] 

143 for sngl in singleinspiraltable]), 

144 'mpirun': find_executable('mpirun') 

145 } 

146 return ini_template.render(ini_settings) 

147 

148 

149@app.task(shared=False) 

150def _setup_dag_for_lalinference(coinc, rundir, event, superevent_id): 

151 """Create DAG for a lalinference run and return the path to DAG. 

152 

153 Parameters 

154 ---------- 

155 coinc : byte contents 

156 Byte contents of ``coinc.xml``. The PSD is expected to be embedded. 

157 rundir : str 

158 The path to a run directory where the DAG file is created. 

159 event : dict 

160 The json contents of a target G event retrieved from 

161 gracedb.get_event(), whose mass and spin information are used to 

162 determine analysis settings. 

163 superevent_id : str 

164 The GraceDB ID of a target superevent 

165 

166 Returns 

167 ------- 

168 path_to_dag : str 

169 The path to the .dag file 

170 

171 """ 

172 # write down coinc.xml in the run directory 

173 path_to_coinc = os.path.join(rundir, 'coinc.xml') 

174 with open(path_to_coinc, 'wb') as f: 

175 f.write(coinc) 

176 

177 # write down and upload ini file 

178 ini_contents = prepare_lalinference_ini(event, superevent_id) 

179 path_to_ini = os.path.join(rundir, 'online_lalinference_pe.ini') 

180 with open(path_to_ini, 'w') as f: 

181 f.write(ini_contents) 

182 gracedb.upload.delay( 

183 ini_contents, filename=os.path.basename(path_to_ini), 

184 graceid=superevent_id, 

185 message=('Automatically generated LALInference configuration file' 

186 ' for this event.'), 

187 tags='pe') 

188 

189 try: 

190 subprocess.run( 

191 ['lalinference_pipe', '--run-path', rundir, 

192 '--coinc', path_to_coinc, path_to_ini, '--psd', path_to_coinc], 

193 capture_output=True, check=True) 

194 except subprocess.CalledProcessError as e: 

195 contents = b'args:\n' + json.dumps(e.args[1]).encode('utf-8') + \ 

196 b'\n\nstdout:\n' + e.stdout + b'\n\nstderr:\n' + e.stderr 

197 gracedb.upload.delay( 

198 filecontents=contents, filename='lalinference_dag.log', 

199 graceid=superevent_id, 

200 message='Failed to prepare DAG for lalinference', tags='pe' 

201 ) 

202 raise 

203 

204 return os.path.join(rundir, 'multidag.dag') 

205 

206 

207@app.task(shared=False) 

208def _setup_dag_for_bilby( 

209 coinc_bayestar, rundir, event, superevent_id, mode="production" 

210): 

211 """Create DAG for a bilby run and return the path to DAG. 

212 

213 Parameters 

214 ---------- 

215 coinc_bayestar : tuple 

216 Byte contents of ``coinc.xml`` and ``bayestar.multiorder.fits``. 

217 rundir : str 

218 The path to a run directory where the DAG file is created 

219 event : dict 

220 The json contents of a target G event retrieved from 

221 gracedb.get_event(), whose mass and spin information are used to 

222 determine analysis settings. 

223 superevent_id : str 

224 The GraceDB ID of a target superevent 

225 mode : str 

226 Analysis mode, allowed options are "production" and "fast_test", 

227 default is "production". 

228 

229 Returns 

230 ------- 

231 path_to_dag : str 

232 The path to the .dag file 

233 

234 Notes 

235 ----- 

236 `--channel-dict o3replay` is added to bilby_pipe_gracedb arguments when the 

237 gracedb host is different from `gracedb.ligo.org` or 

238 `gracedb-test.ligo.org`. Condor queue is set to `Online_PE` if gracedb host 

239 is `gracedb.ligo.org`, and `Online_PE_MDC` otherwise. 

240 

241 """ 

242 path_to_json = os.path.join(rundir, 'event.json') 

243 with open(path_to_json, 'w') as f: 

244 json.dump(event, f, indent=2) 

245 

246 coinc, bayestar = coinc_bayestar 

247 path_to_psd = os.path.join(rundir, 'coinc.xml') 

248 with open(path_to_psd, 'wb') as f: 

249 f.write(coinc) 

250 path_to_bayestar = os.path.join(rundir, 'bayestar.multiorder.fits') 

251 with open(path_to_bayestar, 'wb') as f: 

252 f.write(bayestar) 

253 

254 path_to_webdir = os.path.join( 

255 app.conf['pe_results_path'], superevent_id, 'bilby', mode 

256 ) 

257 

258 path_to_settings = os.path.join(rundir, 'settings.json') 

259 setup_arg = ['bilby_pipe_gracedb', '--webdir', path_to_webdir, 

260 '--outdir', rundir, '--json', path_to_json, 

261 '--psd-file', path_to_psd, '--skymap-file', path_to_bayestar, 

262 '--settings', path_to_settings] 

263 settings = {'summarypages_arguments': {'gracedb': event['graceid'], 

264 'no_ligo_skymap': True}, 

265 'accounting_user': 'soichiro.morisaki'} 

266 if app.conf['gracedb_host'] != 'gracedb.ligo.org': 

267 settings['queue'] = 'Online_PE_MDC' 

268 else: 

269 settings['queue'] = 'Online_PE' 

270 settings['accounting'] = 'ligo.prod.o4.cbc.pe.bilby' 

271 # FIXME: using live data for gracedb-test events should be reconsidered 

272 # when we have a better idea to differentiate MDC and real events. 

273 if app.conf['gracedb_host'] not in [ 

274 'gracedb.ligo.org', 'gracedb-test.ligo.org' 

275 ]: 

276 setup_arg += ['--channel-dict', 'o3replay'] 

277 

278 trigger_chirp_mass = event['extra_attributes']['CoincInspiral']['mchirp'] 

279 if trigger_chirp_mass < 0.6: 

280 raise ValueError( 

281 "No bilby settings available for trigger chirp mass of" 

282 f" {trigger_chirp_mass}Msun." 

283 ) 

284 if mode == 'production': 

285 settings.update( 

286 { 

287 'sampler_kwargs': {'naccept': 60, 'nlive': 500, 

288 'npool': 24, 'sample': 'acceptance-walk'}, 

289 'n_parallel': 3, 

290 'request_cpus': 24, 

291 'spline_calibration_nodes': 10, 

292 'request_memory_generation': 8.0 

293 } 

294 ) 

295 # use low-spin IMRPhenomD below chirp mass of m1=3Msun, m2=1Msun 

296 # assuming binary neutron star 

297 if trigger_chirp_mass < 1.465: 

298 likelihood_mode = 'lowspin_phenomd_fhigh1024_roq' 

299 settings['sampler_kwargs']['naccept'] = 10 

300 # use IMRPhenomPv2 with mass ratio upper bound of 8 below chirp mass of 

301 # m1=8Msun, m2=1Msun 

302 elif trigger_chirp_mass < 2.243: 

303 likelihood_mode = 'phenompv2_bns_roq' 

304 # use IMRPhenomPv2 with mass ratio upper bound of 20 in chirp-mass 

305 # range where IMRPhenomXPHM ROQ bases are not available 

306 elif trigger_chirp_mass < 12: 

307 likelihood_mode = 'low_q_phenompv2_roq' 

308 else: 

309 likelihood_mode = 'phenomxphm_roq' 

310 settings['request_memory_generation'] = 36.0 

311 settings['request_memory'] = 16.0 

312 setup_arg += ['--cbc-likelihood-mode', likelihood_mode] 

313 elif mode == 'fast_test': 

314 setup_arg += ["--sampler-kwargs", "FastTest"] 

315 if trigger_chirp_mass < 3.9: 

316 setup_arg += ['--cbc-likelihood-mode', 'phenompv2_bns_roq'] 

317 settings['request_memory_generation'] = 8.0 

318 else: 

319 raise ValueError(f"mode: {mode} not recognized.") 

320 

321 with open(path_to_settings, 'w') as f: 

322 json.dump(settings, f, indent=2) 

323 

324 try: 

325 subprocess.run(setup_arg, capture_output=True, check=True) 

326 except subprocess.CalledProcessError as e: 

327 contents = b'args:\n' + json.dumps(e.args[1]).encode('utf-8') + \ 

328 b'\n\nstdout:\n' + e.stdout + b'\n\nstderr:\n' + e.stderr 

329 gracedb.upload.delay( 

330 filecontents=contents, filename='bilby_dag.log', 

331 graceid=superevent_id, 

332 message=f'Failed to prepare DAG for {mode}-mode bilby', tags='pe' 

333 ) 

334 raise 

335 else: 

336 # Uploads bilby ini file to GraceDB 

337 with open(os.path.join(rundir, 'bilby_config.ini'), 'r') as f: 

338 ini_contents = f.read() 

339 if mode == 'production': 

340 filename = 'bilby_config.ini' 

341 else: 

342 filename = f'bilby_{mode}_config.ini' 

343 gracedb.upload.delay( 

344 ini_contents, filename=filename, graceid=superevent_id, 

345 message=(f'Automatically generated {mode}-mode Bilby configuration' 

346 ' file for this event.'), 

347 tags='pe') 

348 

349 path_to_dag, = glob.glob(os.path.join(rundir, 'submit/dag*.submit')) 

350 return path_to_dag 

351 

352 

353@app.task(shared=False) 

354def _setup_dag_for_rapidpe(rundir, superevent_id, event): 

355 """Create DAG for a rapidpe run and return the path to DAG. 

356 

357 Parameters 

358 ---------- 

359 rundir : str 

360 The path to a run directory where the DAG file is created 

361 superevent_id : str 

362 The GraceDB ID of a target superevent 

363 

364 Returns 

365 ------- 

366 path_to_dag : str 

367 The path to the .dag file 

368 

369 """ 

370 gracedb_host = app.conf['gracedb_host'] 

371 

372 settings = app.conf['rapidpe_settings'] 

373 trigger_snr = event['extra_attributes']['CoincInspiral']['snr'] 

374 high_snr_trigger = trigger_snr >= 37.5 

375 

376 # dump ini file 

377 ini_template = env.get_template('rapidpe.jinja2') 

378 ini_contents = ini_template.render( 

379 {'rundir': rundir, 

380 'webdir': os.path.join( 

381 app.conf['pe_results_path'], superevent_id, 'rapidpe' 

382 ), 

383 'gracedb_url': f'https://{gracedb_host}/api', 

384 'superevent_id': superevent_id, 

385 'run_mode': settings['run_mode'], 

386 'frame_data_types': app.conf['low_latency_frame_types'], 

387 'accounting_group': settings['accounting_group'], 

388 'use_cprofile': settings['use_cprofile'], 

389 'gracedb_host': gracedb_host, 

390 'high_snr_trigger': high_snr_trigger, 

391 'getenv': RAPIDPE_GETENV, 

392 'environment': RAPIDPE_ENVIRONMENT}) 

393 path_to_ini = os.path.join(rundir, 'rapidpe.ini') 

394 with open(path_to_ini, 'w') as f: 

395 f.write(ini_contents) 

396 gracedb.upload.delay( 

397 ini_contents, filename=os.path.basename(path_to_ini), 

398 graceid=superevent_id, 

399 message=('Automatically generated RapidPE-RIFT configuration file' 

400 ' for this event.'), 

401 tags='pe') 

402 

403 # set up dag 

404 try: 

405 subprocess.run(['rapidpe-rift-pipe', path_to_ini], 

406 capture_output=True, check=True) 

407 except subprocess.CalledProcessError as e: 

408 contents = b'args:\n' + json.dumps(e.args[1]).encode('utf-8') + \ 

409 b'\n\nstdout:\n' + e.stdout + b'\n\nstderr:\n' + e.stderr 

410 

411 message = 'Failed to prepare DAG for Rapid PE' 

412 

413 fail_gracefully = False 

414 if e.returncode == _RAPIDPE_NO_GSTLAL_TRIGGER_EXIT_CODE: 

415 fail_gracefully = True 

416 message += ": no GstLAL trigger available" 

417 

418 gracedb.upload.delay( 

419 filecontents=contents, filename='rapidpe_dag.log', 

420 graceid=superevent_id, 

421 message=message, tags='pe' 

422 ) 

423 

424 if fail_gracefully: 

425 # Ends task but without logging as a failure 

426 raise Ignore() 

427 else: 

428 # Ends task with the unhandled error logged 

429 raise 

430 

431 # return path to dag 

432 dag = os.path.join(rundir, "event_all_iterations.dag") 

433 return dag 

434 

435 

436@app.task(shared=False) 

437def _condor_no_submit(path_to_dag, include_env=None): 

438 """Run 'condor_submit_dag -no_submit' and return the path to .sub file.""" 

439 args = ['condor_submit_dag'] 

440 

441 if include_env is not None: 

442 args += ['-include_env', ','.join(include_env)] 

443 

444 args += ['-no_submit', path_to_dag] 

445 subprocess.run(args, capture_output=True, check=True) 

446 return '{}.condor.sub'.format(path_to_dag) 

447 

448 

449def dag_prepare_task(rundir, event, superevent_id, pe_pipeline, **kwargs): 

450 """Return a canvas of tasks to prepare DAG. 

451 

452 Parameters 

453 ---------- 

454 rundir : str 

455 The path to a run directory where the DAG file is created 

456 event : dict 

457 The json contents of a target G event retrieved from 

458 gracedb.get_event(), whose mass and spin information are used to 

459 determine analysis settings. 

460 superevent_id : str 

461 The GraceDB ID of a target superevent 

462 pe_pipeline : str 

463 The parameter estimation pipeline used, 

464 lalinference, bilby, or rapidpe. 

465 

466 Returns 

467 ------- 

468 canvas : canvas of tasks 

469 The canvas of tasks to prepare DAG 

470 

471 """ 

472 # List of environment variables `condor_submit_dag` should be aware of. 

473 include_env = None 

474 

475 if pe_pipeline == 'lalinference': 

476 canvas = gracedb.download.si('coinc.xml', event['graceid']) | \ 

477 _setup_dag_for_lalinference.s(rundir, event, superevent_id) 

478 elif pe_pipeline == 'bilby': 

479 canvas = group( 

480 gracedb.download.si('coinc.xml', event['graceid']), 

481 gracedb.download.si('bayestar.multiorder.fits', event['graceid']) 

482 ) | _setup_dag_for_bilby.s( 

483 rundir, event, superevent_id, kwargs['bilby_mode'] 

484 ) 

485 elif pe_pipeline == 'rapidpe': 

486 canvas = _setup_dag_for_rapidpe.s(rundir, superevent_id, event) 

487 include_env = RAPIDPE_GETENV 

488 else: 

489 raise NotImplementedError(f'Unknown PE pipeline {pe_pipeline}.') 

490 

491 canvas |= _condor_no_submit.s(include_env=include_env) 

492 

493 return canvas 

494 

495 

496def _find_paths_from_name(directory, name): 

497 """Return the paths of files or directories with given name under the 

498 specfied directory 

499 

500 Parameters 

501 ---------- 

502 directory : string 

503 Name of directory under which the target file or directory is searched 

504 for. 

505 name : string 

506 Name of target files or directories 

507 

508 Returns 

509 ------- 

510 paths : generator 

511 Paths to the target files or directories 

512 

513 """ 

514 return glob.iglob(os.path.join(directory, '**', name), recursive=True) 

515 

516 

517@app.task(ignore_result=True, shared=False) 

518def _clean_up_bilby(rundir): 

519 """Remove large data products produced by bilby 

520 

521 Parameters 

522 ---------- 

523 rundir : str 

524 

525 """ 

526 for p in glob.glob( 

527 os.path.join(rundir, "data/*_generation_roq_weights.hdf5") 

528 ): 

529 os.remove(p) 

530 for p in glob.glob( 

531 os.path.join(rundir, "data/*_generation_data_dump.pickle") 

532 ): 

533 os.remove(p) 

534 

535 

536@app.task(ignore_result=True, shared=False) 

537def job_error_notification(request, exc, traceback, 

538 superevent_id, rundir, analysis): 

539 """Upload notification when condor.submit terminates unexpectedly. 

540 

541 Parameters 

542 ---------- 

543 request : Context (placeholder) 

544 Task request variables 

545 exc : Exception 

546 Exception raised by condor.submit 

547 traceback : str (placeholder) 

548 Traceback message from a task 

549 superevent_id : str 

550 The GraceDB ID of a target superevent 

551 rundir : str 

552 The run directory for PE 

553 analysis : str 

554 Analysis name used as a label in uploaded messages 

555 

556 Notes 

557 ----- 

558 Some large bilby data products are cleaned up after the notification if the 

559 gracedb host is different from `gracedb.ligo.org`. 

560 

561 """ 

562 if isinstance(exc, condor.JobRunning): 

563 subprocess.run(['condor_rm', str(exc.args[0]['Cluster'])]) 

564 canvas = gracedb.upload.si( 

565 filecontents=None, filename=None, graceid=superevent_id, tags='pe', 

566 message=f'The {analysis} condor job was aborted by gwcelery, ' 

567 'due to its long run time.' 

568 ) 

569 elif isinstance(exc, condor.JobAborted): 

570 canvas = gracedb.upload.si( 

571 filecontents=None, filename=None, graceid=superevent_id, tags='pe', 

572 message=f'The {analysis} condor job was aborted.' 

573 ) 

574 else: 

575 canvas = gracedb.upload.si( 

576 filecontents=None, filename=None, graceid=superevent_id, tags='pe', 

577 message=f'The {analysis} condor job failed.' 

578 ) 

579 

580 if analysis == "rapidpe": 

581 to_upload = [ 

582 'event_all_iterations.dag.lib.err', 

583 'marginalize_extrinsic_parameters_iteration_*.dag.lib.err' 

584 ] 

585 else: 

586 to_upload = ['*.log', '*.err', '*.out'] 

587 for filename in to_upload: 

588 tasks = [] 

589 for path in _find_paths_from_name(rundir, filename): 

590 with open(path, 'rb') as f: 

591 contents = f.read() 

592 if contents: 

593 # put .log suffix in log file names so that users can directly 

594 # read the contents instead of downloading them when they click 

595 # file names 

596 tasks.append(gracedb.upload.si( 

597 filecontents=contents, 

598 filename=os.path.basename(path) + '.log', 

599 graceid=superevent_id, 

600 message=f'A log file for {analysis} condor job.', 

601 tags='pe' 

602 )) 

603 canvas |= group(tasks) 

604 

605 if "bilby" in analysis and app.conf['gracedb_host'] != 'gracedb.ligo.org': 

606 canvas |= _clean_up_bilby.si(rundir) 

607 

608 canvas.delay() 

609 

610 

611def _upload_tasks_lalinference(rundir, superevent_id): 

612 """Return canvas of tasks to upload LALInference results 

613 

614 Parameters 

615 ---------- 

616 rundir : str 

617 The path to a run directory 

618 superevent_id : str 

619 The GraceDB ID of a target superevent 

620 

621 Returns 

622 ------- 

623 tasks : canvas 

624 The work-flow for uploading LALInference results 

625 

626 """ 

627 pe_results_path = os.path.join( 

628 app.conf['pe_results_path'], superevent_id, 'lalinference' 

629 ) 

630 

631 # posterior samples 

632 path, = glob.glob( 

633 os.path.join(rundir, '**', 'posterior*.hdf5'), recursive=True) 

634 with open(path, 'rb') as f: 

635 canvas = gracedb.upload.si( 

636 f.read(), 'LALInference.posterior_samples.hdf5', 

637 superevent_id, 'LALInference posterior samples', 'pe') 

638 

639 # plots 

640 tasks = [] 

641 for filename, message in [ 

642 ('extrinsic.png', 'LALInference corner plot for extrinsic parameters'), 

643 ('intrinsic.png', 'LALInference corner plot for intrinsic parameters') 

644 ]: 

645 # Here it is not required that only a single png file exists, so that 

646 # posterior samples are uploaded whatever. This applies for the other 

647 # files. 

648 for path in _find_paths_from_name(pe_results_path, filename): 

649 with open(path, 'rb') as f: 

650 tasks.append(gracedb.upload.si( 

651 f.read(), f'LALInference.{filename}', superevent_id, 

652 message, 'pe' 

653 )) 

654 canvas |= group(tasks) 

655 

656 # psd 

657 tasks = [] 

658 for path in _find_paths_from_name(rundir, 'glitch_median_PSD_forLI_*.dat'): 

659 with open(path, 'r') as f: 

660 tasks.append(gracedb.upload.si( 

661 f.read(), os.path.basename(path), superevent_id, 

662 'Bayeswave PSD used for LALInference PE', 'pe' 

663 )) 

664 canvas |= group(tasks) 

665 

666 # dag 

667 tasks = [] 

668 for path in _find_paths_from_name(rundir, 'lalinference*.dag'): 

669 with open(path, 'r') as f: 

670 tasks.append(gracedb.upload.si( 

671 f.read(), os.path.basename(path), superevent_id, 

672 'LALInference DAG', 'pe' 

673 )) 

674 canvas |= group(tasks) 

675 

676 # link to results page 

677 tasks = [] 

678 for path in _find_paths_from_name(pe_results_path, 'posplots.html'): 

679 baseurl = urllib.parse.urljoin( 

680 app.conf['pe_results_url'], 

681 os.path.relpath(path, app.conf['pe_results_path']) 

682 ) 

683 tasks.append(gracedb.upload.si( 

684 None, None, superevent_id, 

685 'Online lalinference parameter estimation finished. ' 

686 f'<a href={baseurl}>results</a>' 

687 )) 

688 canvas |= group(tasks) 

689 

690 return canvas 

691 

692 

693def _upload_tasks_bilby(rundir, superevent_id, mode): 

694 """Return canvas of tasks to upload Bilby results 

695 

696 Parameters 

697 ---------- 

698 rundir : str 

699 The path to a run directory 

700 superevent_id : str 

701 The GraceDB ID of a target superevent 

702 mode : str 

703 Analysis mode 

704 

705 Returns 

706 ------- 

707 tasks : canvas 

708 The work-flow for uploading Bilby results 

709 

710 Notes 

711 ----- 

712 Some large bilby data products are cleaned up after posteterior file is 

713 uploaded if the gracedb host is different from `gracedb.ligo.org`. 

714 

715 """ 

716 # convert bilby sample file into one compatible with ligo-skymap 

717 samples_dir = os.path.join(rundir, 'final_result') 

718 if mode == 'production': 

719 samples_filename = 'Bilby.posterior_samples.hdf5' 

720 else: 

721 samples_filename = f'Bilby.{mode}.posterior_samples.hdf5' 

722 out_samples = os.path.join(samples_dir, samples_filename) 

723 in_samples, = glob.glob(os.path.join(samples_dir, '*result.hdf5')) 

724 subprocess.run( 

725 ['bilby_pipe_to_ligo_skymap_samples', in_samples, '--out', out_samples] 

726 ) 

727 

728 with open(out_samples, 'rb') as f: 

729 canvas = gracedb.upload.si( 

730 f.read(), samples_filename, 

731 superevent_id, f'{mode}-mode Bilby posterior samples', 'pe') 

732 

733 if app.conf['gracedb_host'] != 'gracedb.ligo.org': 

734 canvas |= _clean_up_bilby.si(rundir) 

735 

736 # pesummary 

737 pesummary_kwargs = {} 

738 path_to_ini, = glob.glob(os.path.join(rundir, "*_complete.ini")) 

739 pesummary_kwargs["config"] = path_to_ini 

740 config_parser = BilbyConfigFileParser() 

741 with open(path_to_ini, "r") as f: 

742 config_content, _, _, _ = config_parser.parse(f) 

743 pesummary_kwargs["psd"] = convert_string_to_dict( 

744 config_content["psd-dict"] 

745 ) 

746 pesummary_kwargs["calibration"] = convert_string_to_dict( 

747 config_content["spline-calibration-envelope-dict"] 

748 ) 

749 pesummary_kwargs["approximant"] = config_content["waveform-approximant"] 

750 pesummary_kwargs["f_low"] = config_content["minimum-frequency"] 

751 pesummary_kwargs["f_ref"] = config_content["reference-frequency"] 

752 pesummary_kwargs["label"] = "online" 

753 

754 webdir = os.path.join(config_content["webdir"], 'pesummary') 

755 url = urllib.parse.urljoin( 

756 app.conf['pe_results_url'], 

757 os.path.relpath( 

758 os.path.join(webdir, 'home.html'), 

759 app.conf['pe_results_path'] 

760 ) 

761 ) 

762 canvas = group( 

763 canvas, 

764 _pesummary_task(webdir, in_samples, **pesummary_kwargs) 

765 | 

766 gracedb.upload.si( 

767 None, None, superevent_id, 

768 f'PESummary page for {mode}-mode Bilby is available ' 

769 f'<a href={url}>here</a>', 

770 'pe' 

771 ) 

772 ) 

773 

774 return canvas 

775 

776 

777def _upload_tasks_rapidpe(rundir, superevent_id): 

778 summary_path = os.path.join(rundir, "summary") 

779 

780 url = urllib.parse.urljoin( 

781 app.conf['pe_results_url'], 

782 os.path.join(superevent_id, 'rapidpe', 'summarypage.html') 

783 ) 

784 canvas = gracedb.upload.si( 

785 None, None, superevent_id, 

786 f'Summary page for RapidPE-RIFT is available <a href={url}>here</a>', 

787 ('pe',)) 

788 to_upload = [ 

789 ( 

790 "p_astro.json", "RapidPE_RIFT.p_astro.json", 

791 "RapidPE-RIFT Pastro results", 

792 ("pe", "p_astro", "public"), 

793 ), 

794 ] 

795 tasks = [] 

796 for src_basename, dst_filename, description, tags in to_upload: 

797 src_filename = os.path.join(summary_path, src_basename) 

798 if os.path.isfile(src_filename): 

799 with open(src_filename, "rb") as f: 

800 tasks.append( 

801 gracedb.upload.si( 

802 f.read(), dst_filename, 

803 superevent_id, description, tags)) 

804 

805 canvas |= group(tasks) 

806 

807 return canvas 

808 

809 

810@app.task(ignore_result=True, shared=False) 

811def dag_finished(rundir, superevent_id, pe_pipeline, **kwargs): 

812 """Upload PE results 

813 

814 Parameters 

815 ---------- 

816 rundir : str 

817 The path to a run directory where the DAG file exits 

818 superevent_id : str 

819 The GraceDB ID of a target superevent 

820 pe_pipeline : str 

821 The parameter estimation pipeline used, 

822 lalinference, bilby, or rapidpe. 

823 

824 """ 

825 if pe_pipeline == 'lalinference': 

826 canvas = _upload_tasks_lalinference(rundir, superevent_id) 

827 elif pe_pipeline == 'bilby': 

828 canvas = _upload_tasks_bilby( 

829 rundir, superevent_id, kwargs['bilby_mode']) 

830 elif pe_pipeline == 'rapidpe': 

831 canvas = _upload_tasks_rapidpe(rundir, superevent_id) 

832 else: 

833 raise NotImplementedError(f'Unknown PE pipeline {pe_pipeline}.') 

834 

835 canvas.delay() 

836 

837 # NOTE: check if this should include rapidpe as well 

838 if pe_pipeline == 'bilby': 

839 gracedb.create_label.delay('PE_READY', superevent_id) 

840 

841 

842def _pesummary_task(webdir, samples, **pesummary_kwargs): 

843 """Return a celery task to submit a pesummary condor job. 

844 

845 Parameters 

846 ---------- 

847 webdir : str 

848 output directory 

849 samples : str 

850 path to posterior sample file 

851 **pesummary_kwargs 

852 Extra arguments of summarypages 

853 

854 Returns 

855 ------- 

856 celery task 

857 

858 Notes 

859 ----- 

860 `--disable_interactive --disable_expert` are added and `--redshift_method 

861 exact --evolve_spins_forwards` are not added to `summarypages` arguments 

862 when the gracedb host is different from `gracedb.ligo.org`. Condor queue is 

863 set to `Online_PE` if gracedb host is `gracedb.ligo.org`, and 

864 `Online_PE_MDC` otherwise. 

865 

866 """ 

867 args = [ 

868 "summarypages", "--webdir", webdir, "--samples", samples, "--gw", 

869 "--no_ligo_skymap", "--multi_process", "6" 

870 ] 

871 for key in pesummary_kwargs: 

872 if key in ["psd", "calibration"]: 

873 args += [f"--{key}"] 

874 for ifo in pesummary_kwargs[key]: 

875 args += [f'{ifo}:{pesummary_kwargs[key][ifo]}'] 

876 else: 

877 args += [f"--{key}", pesummary_kwargs[key]] 

878 condor_kwargs = dict( 

879 request_memory=16000, request_disk=5000, request_cpus=6, 

880 accounting_group_user='soichiro.morisaki' 

881 ) 

882 if app.conf['gracedb_host'] != 'gracedb.ligo.org': 

883 condor_kwargs['accounting_group'] = 'ligo.dev.o4.cbc.pe.bilby' 

884 condor_kwargs['requirements'] = '((TARGET.Online_PE_MDC =?= True))' 

885 condor_kwargs['+Online_PE_MDC'] = True 

886 args += ["--disable_interactive", "--disable_expert"] 

887 else: 

888 condor_kwargs['accounting_group'] = 'ligo.prod.o4.cbc.pe.bilby' 

889 condor_kwargs['requirements'] = '((TARGET.Online_PE =?= True))' 

890 condor_kwargs['+Online_PE'] = True 

891 args += ["--redshift_method", "exact", "--evolve_spins_forwards"] 

892 return condor.check_output.si(args, **condor_kwargs) 

893 

894 

895# Modified version of condor.submit task with retry kwargs overridden with 

896# RapidPE-specific settings. 

897submit_rapidpe = app.task( 

898 **condor.submit_kwargs, 

899 **app.conf['rapidpe_condor_retry_kwargs'], 

900)(condor.submit.run) 

901 

902 

903@app.task(ignore_result=True, shared=False) 

904def start_pe(event, superevent_id, pe_pipeline): 

905 """Run Parameter Estimation on a given event. 

906 

907 Parameters 

908 ---------- 

909 event : dict 

910 The json contents of a target G event retrieved from 

911 gracedb.get_event(), whose mass and spin information are used to 

912 determine analysis settings. 

913 superevent_id : str 

914 The GraceDB ID of a target superevent 

915 pe_pipeline : str 

916 The parameter estimation pipeline used, 

917 lalinference, bilby, or rapidpe. 

918 

919 """ 

920 # make an event directory 

921 pipeline_dir = os.path.expanduser('~/.cache/{}'.format(pe_pipeline)) 

922 mkpath(pipeline_dir) 

923 event_dir = os.path.join(pipeline_dir, superevent_id) 

924 

925 if pe_pipeline == 'bilby': 

926 if ( 

927 app.conf['gracedb_host'] == 'gracedb-playground.ligo.org' and 

928 event['extra_attributes']['CoincInspiral']['mchirp'] >= 12 

929 ): 

930 # Count the number of BBH jobs and do not start a run if it exceeds 

931 # 5 so that we do not use up disk space. We assume that the job is 

932 # running if a data dump pickle file exists under the run 

933 # directory, which is the largest file produced by PE and removed 

934 # when the run completes. 

935 number_of_bbh_running = 0 

936 for p in glob.glob( 

937 os.path.join( 

938 pipeline_dir, 

939 "*/*/data/*_generation_data_dump.pickle" 

940 ) 

941 ): 

942 path_to_ev = os.path.join(os.path.dirname(p), "../event.json") 

943 if os.path.exists(path_to_ev): 

944 with open(path_to_ev, "r") as f: 

945 ev = json.load(f) 

946 mc = ev['extra_attributes']['CoincInspiral']['mchirp'] 

947 if mc >= 12: 

948 number_of_bbh_running += 1 

949 if number_of_bbh_running > 5: 

950 gracedb.upload.delay( 

951 filecontents=None, filename=None, graceid=superevent_id, 

952 message='Parameter estimation will not start to save disk ' 

953 f'space (There are {number_of_bbh_running} BBH ' 

954 'jobs running).', 

955 tags='pe' 

956 ) 

957 return 

958 modes = ["production"] 

959 rundirs = [os.path.join(event_dir, m) for m in modes] 

960 kwargs_list = [{'bilby_mode': m} for m in modes] 

961 analyses = [f'{m}-mode bilby' for m in modes] 

962 condor_submit_task = condor.submit 

963 elif pe_pipeline == 'rapidpe': 

964 rundirs = [event_dir] 

965 kwargs_list = [{'event_pipeline': event["pipeline"]}] 

966 analyses = [pe_pipeline] 

967 condor_submit_task = submit_rapidpe 

968 else: 

969 rundirs = [event_dir] 

970 kwargs_list = [{}] 

971 analyses = [pe_pipeline] 

972 condor_submit_task = condor.submit 

973 

974 os.mkdir(event_dir) 

975 for rundir, kwargs, analysis in zip(rundirs, kwargs_list, analyses): 

976 mkpath(rundir) 

977 

978 gracedb.upload.delay( 

979 filecontents=None, filename=None, graceid=superevent_id, 

980 message=(f'Starting {analysis} parameter estimation ' 

981 f'for {event["graceid"]}'), 

982 tags='pe' 

983 ) 

984 

985 ( 

986 dag_prepare_task( 

987 rundir, event, superevent_id, pe_pipeline, **kwargs 

988 ) 

989 | 

990 condor_submit_task.s().on_error( 

991 job_error_notification.s(superevent_id, rundir, analysis) 

992 ) 

993 | 

994 dag_finished.si(rundir, superevent_id, pe_pipeline, **kwargs) 

995 ).delay()