Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1"""Tasks that comprise the alert orchestrator. 

2 

3The orchestrator is responsible for the vetting and annotation workflow to 

4produce preliminary, initial, and update alerts for gravitational-wave event 

5candidates. 

6""" 

7import io 

8import json 

9import re 

10 

11from celery import group 

12import h5py 

13 

14from ..import app 

15from . import bayestar 

16from . import circulars 

17from .core import identity, ordered_group, ordered_group_first 

18from . import detchar 

19from . import em_bright 

20from . import gcn 

21from . import gracedb 

22from . import inference 

23from . import lvalert 

24from . import p_astro 

25from . import skymaps 

26from . import superevents 

27 

28 

29@lvalert.handler('superevent', 

30 'mdc_superevent', 

31 shared=False) 

32def handle_superevent(alert): 

33 """Schedule annotations for new superevents. 

34 

35 After waiting for a time specified by the 

36 :obj:`~gwcelery.conf.orchestrator_timeout` configuration variable 

37 for the choice of preferred event to settle down, this task performs data 

38 quality checks with :meth:`gwcelery.tasks.detchar.check_vectors` and 

39 calls :meth:`~gwcelery.tasks.orchestrator.preliminary_alert` to send a 

40 preliminary GCN notice. 

41 """ 

42 superevent_id = alert['uid'] 

43 # launch PE and detchar based on new type superevents 

44 if alert['alert_type'] == 'new': 

45 ( 

46 _get_preferred_event.si(superevent_id).set( 

47 countdown=app.conf['pe_timeout'] 

48 ) 

49 | 

50 ordered_group( 

51 _get_lowest_far.si(superevent_id), 

52 gracedb.get_event.s() 

53 ) 

54 | 

55 parameter_estimation.s(superevent_id) 

56 ).apply_async() 

57 

58 # run check_vectors. Create and upload omegascans 

59 group( 

60 detchar.omegascan.si(alert['object']['t_0'], superevent_id), 

61 

62 gracedb.get_event.si(alert['object']['preferred_event']) 

63 | 

64 detchar.check_vectors.s( 

65 superevent_id, 

66 alert['object']['t_start'], 

67 alert['object']['t_end'] 

68 ) 

69 ).delay() 

70 

71 elif alert['alert_type'] == 'label_added': 

72 label_name = alert['data']['name'] 

73 if label_name == superevents.FROZEN_LABEL: 

74 ( 

75 gracedb.get_event.s(alert['object']['preferred_event']) 

76 | 

77 _leave_log_message_and_return_event_dict.s( 

78 superevent_id, 

79 "Automated DQ check before sending preliminary alert. " 

80 "New results supersede old results.", 

81 tags=['data_quality'] 

82 ) 

83 | 

84 detchar.check_vectors.s( 

85 superevent_id, 

86 alert['object']['t_start'], 

87 alert['object']['t_end'] 

88 ) 

89 | 

90 preliminary_alert.s(superevent_id) 

91 ).apply_async() 

92 

93 elif label_name == superevents.READY_LABEL: 

94 ( 

95 _get_preferred_event.si(superevent_id).set( 

96 countdown=app.conf['subthreshold_annotation_timeout'] 

97 ) 

98 | 

99 gracedb.get_event.s() 

100 | 

101 preliminary_alert.s(superevent_id, 

102 annotation_prefix='subthreshold.', 

103 initiate_voevent=False) 

104 ).apply_async() 

105 

106 # launch second preliminary on GCN_PRELIM_SENT 

107 elif label_name == 'GCN_PRELIM_SENT': 

108 query = f'superevent: {superevent_id}' 

109 if alert['object']['category'] == 'MDC': 

110 query += ' MDC' 

111 elif alert['object']['category'] == 'Test': 

112 query += ' Test' 

113 

114 ( 

115 gracedb.get_events.si(query).set( 

116 countdown=app.conf['superevent_clean_up_timeout'] 

117 ) 

118 | 

119 superevents.select_preferred_event.s() 

120 | 

121 _update_superevent_and_return_event_dict.s(superevent_id) 

122 | 

123 _leave_log_message_and_return_event_dict.s( 

124 superevent_id, 

125 "Superevent cleaned up." 

126 ) 

127 | 

128 preliminary_alert.s(superevent_id) 

129 ).apply_async() 

130 # launch initial/retraction alert on ADVOK/ADVNO 

131 elif label_name == 'ADVOK': 

132 initial_alert((None, None, None), superevent_id, 

133 labels=alert['object']['labels']) 

134 elif label_name == 'ADVNO': 

135 retraction_alert(superevent_id) 

136 

137 # check DQV label on superevent, run check_vectors if required 

138 elif alert['alert_type'] == 'event_added': 

139 new_event_id = alert['data']['preferred_event'] 

140 start = alert['data']['t_start'] 

141 end = alert['data']['t_end'] 

142 

143 if 'DQV' in gracedb.get_labels(superevent_id): 

144 ( 

145 gracedb.get_event.s(new_event_id) 

146 | 

147 detchar.check_vectors.s(superevent_id, start, end) 

148 | 

149 _update_if_dqok.si(superevent_id, new_event_id) 

150 ).apply_async() 

151 

152 

153@lvalert.handler('cbc_gstlal', 

154 'cbc_spiir', 

155 'cbc_pycbc', 

156 'cbc_mbtaonline', 

157 shared=False) 

158def handle_cbc_event(alert): 

159 """Perform annotations for CBC events that depend on pipeline-specific 

160 matched-filter parameter estimates. 

161 

162 Notes 

163 ----- 

164 This LVAlert message handler is triggered by updates that include the file 

165 ``psd.xml.gz``. The table below lists which 

166 files are created as a result, and which tasks generate them. 

167 

168 ============================== ================================================ 

169 File Task 

170 ============================== ================================================ 

171 ``bayestar.multiorder.fits`` :meth:`gwcelery.tasks.bayestar.localize` 

172 ``em_bright.json`` :meth:`gwcelery.tasks.em_bright.classifier` 

173 ``p_astro.json.json`` :meth:`gwcelery.tasks.p_astro.compute_p_astro` 

174 ============================== ================================================ 

175 

176 """ # noqa: E501 

177 graceid = alert['uid'] 

178 pipeline = alert['object']['pipeline'].lower() 

179 priority = 0 if superevents.should_publish(alert['object']) else 1 

180 

181 # em_bright and p_astro calculation 

182 if alert['alert_type'] == 'new': 

183 instruments = superevents.get_instruments_in_ranking_statistic( 

184 alert['object']) 

185 extra_attributes = alert['object']['extra_attributes'] 

186 snr = superevents.get_snr(alert['object']) 

187 far = alert['object']['far'] 

188 mass1 = extra_attributes['SingleInspiral'][0]['mass1'] 

189 mass2 = extra_attributes['SingleInspiral'][0]['mass2'] 

190 chi1 = extra_attributes['SingleInspiral'][0]['spin1z'] 

191 chi2 = extra_attributes['SingleInspiral'][0]['spin2z'] 

192 

193 em_bright_task = em_bright.classifier_gstlal if pipeline == 'gstlal' \ 

194 else em_bright.classifier_other 

195 

196 ( 

197 em_bright_task.si(mass1, mass2, chi1, chi2, snr) 

198 | 

199 gracedb.upload.s( 

200 'em_bright.json', graceid, 

201 'em bright complete', ['em_bright', 'public'] 

202 ) 

203 | 

204 gracedb.create_label.si('EMBRIGHT_READY', graceid) 

205 ).apply_async(priority=priority) 

206 

207 # p_astro calculation for other pipelines 

208 if pipeline != 'gstlal' or alert['object']['search'] == 'MDC': 

209 ( 

210 p_astro.compute_p_astro.s(snr, 

211 far, 

212 mass1, 

213 mass2, 

214 pipeline, 

215 instruments) 

216 | 

217 gracedb.upload.s( 

218 'p_astro.json', graceid, 

219 'p_astro computation complete', ['p_astro', 'public'] 

220 ) 

221 | 

222 gracedb.create_label.si('PASTRO_READY', graceid) 

223 ).apply_async(priority=priority) 

224 

225 # Start BAYESTAR for PyCBC. 

226 # PyCBC includes the PSD data in the initial upload, 

227 # so we just download the coinc.xml file. 

228 if pipeline == 'pycbc': 

229 ( 

230 group( 

231 gracedb.download.s('coinc.xml', graceid) 

232 ) 

233 | 

234 bayestar.localize.s(graceid) 

235 | 

236 gracedb.upload.s( 

237 'bayestar.multiorder.fits', graceid, 

238 'sky localization complete', ['sky_loc', 'public'] 

239 ) 

240 | 

241 gracedb.create_label.si('SKYMAP_READY', graceid) 

242 ).apply_async(priority=priority) 

243 

244 if alert['alert_type'] != 'log': 

245 return 

246 

247 filename = alert['data']['filename'] 

248 

249 # Start BAYESTAR for any pipeline *except* PyCBC. 

250 # All pipelines but PyCBC upload the PSD in a separate file, psd.xml.gz. 

251 # For those pipelines, BAYESTAR must download coinc.xml *and* psd.xml.gz. 

252 # 

253 # FIXME: The separate psd.xml.gz upload adds an extra couple seconds of 

254 # latency due to the additional GraceDB transactions and ping times. 

255 # If other pipelines were able to add the PSD to the initial upload, 

256 # then we could cut down on the alert latency by a couple seconds. 

257 if pipeline != 'pycbc' and filename == 'psd.xml.gz': 

258 ( 

259 group( 

260 gracedb.download.s('coinc.xml', graceid), 

261 gracedb.download.s('psd.xml.gz', graceid) 

262 ) 

263 | 

264 bayestar.localize.s(graceid) 

265 | 

266 gracedb.upload.s( 

267 'bayestar.multiorder.fits', graceid, 

268 'sky localization complete', ['sky_loc', 'public'] 

269 ) 

270 | 

271 gracedb.create_label.si('SKYMAP_READY', graceid) 

272 ).apply_async(priority=priority) 

273 

274 

275@app.task(shared=False) 

276def _remove_duplicate_meta(hdf5): 

277 """Remove 'nLocalTemps' and 'randomSeed', which are duplicated in the 

278 metadata and column names of a posterior sample file and cause failure in 

279 the skymap generation. 

280 

281 FIXME: See https://git.ligo.org/lscsoft/lalsuite/issues/250. 

282 """ 

283 bio = io.BytesIO(hdf5) 

284 with h5py.File(bio, "r+") as f: 

285 try: 

286 tmp = f['lalinference']['lalinference_mcmc'] 

287 except KeyError: 

288 pass 

289 else: 

290 meta = tmp['posterior_samples'].attrs 

291 meta.pop('nLocalTemps', None) 

292 meta.pop('randomSeed', None) 

293 return bio.getvalue() 

294 

295 

296@lvalert.handler('superevent', 

297 'mdc_superevent', 

298 shared=False) 

299def handle_posterior_samples(alert): 

300 """Generate multi-resolution and flat-resolution fits files and skymaps 

301 from an uploaded HDF5 file containing posterior samples. 

302 """ 

303 if alert['alert_type'] != 'log' or \ 

304 not alert['data']['filename'].endswith('.posterior_samples.hdf5'): 

305 return 

306 superevent_id = alert['uid'] 

307 filename = alert['data']['filename'] 

308 info = '{} {}'.format(alert['data']['comment'], filename) 

309 prefix, _ = filename.rsplit('.posterior_samples.') 

310 

311 # FIXME: _remove_duplicate_meta should be omitted as soon as 

312 # https://git.ligo.org/lscsoft/lalsuite/issues/250 is fixed. 

313 ( 

314 gracedb.download.si(filename, superevent_id) 

315 | 

316 _remove_duplicate_meta.s() 

317 | 

318 skymaps.skymap_from_samples.s() 

319 | 

320 group( 

321 skymaps.annotate_fits.s( 

322 '{}.fits.gz'.format(prefix), 

323 superevent_id, ['pe', 'sky_loc', 'public'] 

324 ), 

325 

326 gracedb.upload.s( 

327 '{}.multiorder.fits'.format(prefix), superevent_id, 

328 'Multiresolution fits file generated from "{}"'.format(info), 

329 ['pe', 'sky_loc', 'public'] 

330 ), 

331 

332 skymaps.flatten.s('{}.fits.gz'.format(prefix)) 

333 | 

334 gracedb.upload.s( 

335 '{}.fits.gz'.format(prefix), superevent_id, 

336 'Flat-resolution fits file created from "{}"'.format(info), 

337 ['pe', 'sky_loc', 'public'] 

338 ) 

339 ) 

340 ).delay() 

341 

342 # em_bright from LALInference posterior samples 

343 ( 

344 gracedb.download.si(filename, superevent_id) 

345 | 

346 em_bright.em_bright_posterior_samples.s() 

347 | 

348 gracedb.upload.s( 

349 '{}.em_bright.json'.format(prefix), superevent_id, 

350 'em-bright computed from "{}"'.format(info) 

351 ) 

352 ).delay() 

353 

354 

355@app.task(shared=False, ignore_result=True) 

356def _update_if_dqok(superevent_id, event_id): 

357 """Update `preferred_event` of `superevent_id` to `event_id` if `DQOK` 

358 label has been applied. 

359 """ 

360 if 'DQOK' in gracedb.get_labels(superevent_id): 

361 gracedb.update_superevent(superevent_id, preferred_event=event_id) 

362 gracedb.create_log.delay( 

363 f'DQOK applied based on new event {event_id}', superevent_id) 

364 

365 

366@gracedb.task(shared=False) 

367def _get_preferred_event(superevent_id): 

368 """Determine preferred event for a superevent by querying GraceDB. 

369 

370 This works just like :func:`gwcelery.tasks.gracedb.get_superevent`, except 

371 that it returns only the preferred event, and not the entire GraceDB JSON 

372 response. 

373 """ 

374 # FIXME: remove ._orig_run when this bug is fixed: 

375 # https://github.com/getsentry/sentry-python/issues/370 

376 return gracedb.get_superevent._orig_run(superevent_id)['preferred_event'] 

377 

378 

379@gracedb.task(shared=False) 

380def _create_voevent(classification, *args, **kwargs): 

381 r"""Create a VOEvent record from an EM bright JSON file. 

382 

383 Parameters 

384 ---------- 

385 classification : tuple, None 

386 A collection of JSON strings, generated by 

387 :meth:`gwcelery.tasks.em_bright.classifier` and 

388 :meth:`gwcelery.tasks.p_astro.compute_p_astro` or 

389 content of ``p_astro.json`` uploaded by gstlal respectively; 

390 or None 

391 \*args 

392 Additional positional arguments passed to 

393 :meth:`gwcelery.tasks.gracedb.create_voevent`. 

394 \*\*kwargs 

395 Additional keyword arguments passed to 

396 :meth:`gwcelery.tasks.gracedb.create_voevent`. 

397 

398 Returns 

399 ------- 

400 str 

401 The filename of the newly created VOEvent. 

402 

403 """ 

404 kwargs = dict(kwargs) 

405 

406 if classification is not None: 

407 # Merge source classification and source properties into kwargs. 

408 for text in classification: 

409 if text is not None: 

410 kwargs.update(json.loads(text)) 

411 

412 # FIXME: These keys have differ between em_bright.json 

413 # and the GraceDB REST API. 

414 try: 

415 kwargs['ProbHasNS'] = kwargs.pop('HasNS') 

416 except KeyError: 

417 pass 

418 

419 try: 

420 kwargs['ProbHasRemnant'] = kwargs.pop('HasRemnant') 

421 except KeyError: 

422 pass 

423 

424 skymap_filename = kwargs.get('skymap_filename') 

425 if skymap_filename is not None: 

426 skymap_type = re.sub(r'\.fits(\..+)?$', '', skymap_filename) 

427 kwargs.setdefault('skymap_type', skymap_type) 

428 

429 # FIXME: remove ._orig_run when this bug is fixed: 

430 # https://github.com/getsentry/sentry-python/issues/370 

431 return gracedb.create_voevent._orig_run(*args, **kwargs) 

432 

433 

434@gracedb.task(shared=False) 

435def _create_label_and_return_filename(filename, label, graceid): 

436 gracedb.create_label.delay(label, graceid) 

437 return filename 

438 

439 

440@gracedb.task(shared=False) 

441def _leave_log_message_and_return_event_dict(event, superevent_id, 

442 message, **kwargs): 

443 """Wrapper around :meth:`gracedb.update_superevent` 

444 that returns the event dictionary. 

445 """ 

446 gracedb.upload.delay(None, None, superevent_id, message, **kwargs) 

447 return event 

448 

449 

450@gracedb.task(shared=False) 

451def _update_superevent_and_return_event_dict(event, superevent_id): 

452 """Wrapper around :meth:`gracedb.update_superevent` 

453 that returns the event dictionary. 

454 """ 

455 gracedb.update_superevent(superevent_id, 

456 preferred_event=event['graceid']) 

457 return event 

458 

459 

460@gracedb.task(shared=False) 

461def _proceed_if_no_advocate_action(filenames, superevent_id): 

462 """Return filenames in case the superevent does not have labels 

463 indicating advocate action. 

464 """ 

465 superevent_labels = gracedb.get_labels(superevent_id) 

466 blocking_labels = {'ADVOK', 'ADVNO'}.intersection( 

467 superevent_labels) 

468 if blocking_labels: 

469 gracedb.upload.delay( 

470 None, None, superevent_id, 

471 f"Blocking automated notice due to labels {blocking_labels}" 

472 ) 

473 return None 

474 else: 

475 gracedb.upload.delay(None, None, superevent_id, 

476 "Sending preliminary notice") 

477 return filenames 

478 

479 

480@app.task(ignore_result=True, shared=False) 

481def preliminary_alert(event, superevent_id, annotation_prefix='', 

482 initiate_voevent=True): 

483 """Produce a preliminary alert by copying any sky maps. 

484 

485 This consists of the following steps: 

486 

487 1. Copy any sky maps and source classification from the preferred event 

488 to the superevent. 

489 2. Create standard annotations for sky maps including all-sky plots by 

490 calling :meth:`gwcelery.tasks.skymaps.annotate_fits`. 

491 3. Create a preliminary VOEvent. 

492 4. Send the VOEvent to GCN. 

493 5. Apply the GCN_PRELIM_SENT label to the superevent. 

494 6. Create and upload a GCN Circular draft. 

495 """ 

496 priority = 0 if superevents.should_publish(event) else 1 

497 preferred_event_id = event['graceid'] 

498 

499 if event['group'] == 'CBC': 

500 skymap_filename = 'bayestar.multiorder.fits' 

501 elif event['pipeline'] == 'CWB': 

502 skymap_filename = 'cWB.fits.gz' 

503 elif event['pipeline'] == 'oLIB': 

504 skymap_filename = 'oLIB.fits.gz' 

505 else: 

506 skymap_filename = None 

507 

508 original_skymap_filename = skymap_filename 

509 if skymap_filename.endswith('.multiorder.fits'): 

510 skymap_filename = skymap_filename.replace('.multiorder.fits', '.fits') 

511 if skymap_filename.endswith('.fits'): 

512 skymap_filename += '.gz' 

513 

514 # Determine if the event should be made public. 

515 is_publishable = (superevents.should_publish(event) 

516 and {'DQV', 'INJ'}.isdisjoint(event['labels'])) 

517 

518 canvas = ordered_group( 

519 ( 

520 gracedb.download.si(original_skymap_filename, preferred_event_id) 

521 | 

522 ordered_group_first( 

523 skymaps.flatten.s(annotation_prefix + skymap_filename) 

524 | 

525 gracedb.upload.s( 

526 annotation_prefix + skymap_filename, 

527 superevent_id, 

528 message='Flattened from multiresolution file {}'.format( 

529 original_skymap_filename), 

530 tags=['sky_loc'] if annotation_prefix else [ 

531 'sky_loc', 'public'] 

532 ) 

533 | 

534 _create_label_and_return_filename.s( 

535 'SKYMAP_READY', superevent_id 

536 ), 

537 

538 gracedb.upload.s( 

539 annotation_prefix + original_skymap_filename, 

540 superevent_id, 

541 message='Localization copied from {}'.format( 

542 preferred_event_id), 

543 tags=['sky_loc'] if annotation_prefix else [ 

544 'sky_loc', 'public'] 

545 ), 

546 

547 skymaps.annotate_fits.s( 

548 annotation_prefix + skymap_filename, 

549 superevent_id, 

550 ['sky_loc'] if annotation_prefix else [ 

551 'sky_loc', 'public'] 

552 ) 

553 ) 

554 ) if skymap_filename is not None else identity.s(None), 

555 

556 ( 

557 gracedb.download.si('em_bright.json', preferred_event_id) 

558 | 

559 gracedb.upload.s( 

560 annotation_prefix + 'em_bright.json', 

561 superevent_id, 

562 message='Source properties copied from {}'.format( 

563 preferred_event_id), 

564 tags=['em_bright'] if annotation_prefix else [ 

565 'em_bright', 'public'] 

566 ) 

567 | 

568 _create_label_and_return_filename.s( 

569 'EMBRIGHT_READY', superevent_id 

570 ) 

571 ) if event['group'] == 'CBC' else identity.s(None), 

572 

573 ( 

574 gracedb.download.si('p_astro.json', preferred_event_id) 

575 | 

576 gracedb.upload.s( 

577 annotation_prefix + 'p_astro.json', 

578 superevent_id, 

579 message='Source classification copied from {}'.format( 

580 preferred_event_id), 

581 tags=['p_astro'] if annotation_prefix else [ 

582 'p_astro', 'public'] 

583 ) 

584 | 

585 _create_label_and_return_filename.s( 

586 'PASTRO_READY', superevent_id 

587 ) 

588 ) if event['group'] == 'CBC' else identity.s(None) 

589 ) 

590 

591 # Switch for disabling all but MDC alerts. 

592 if app.conf['only_alert_for_mdc']: 

593 if event.get('search') != 'MDC': 

594 canvas |= gracedb.upload.s( 

595 None, None, superevent_id, 

596 ("Skipping alert because gwcelery has been configured to only" 

597 " send alerts for MDC events.")) 

598 canvas.apply_async(priority=priority) 

599 return 

600 

601 # Send GCN notice and upload GCN circular draft for online events. 

602 if is_publishable and initiate_voevent: 

603 canvas |= ( 

604 _proceed_if_no_advocate_action.s(superevent_id) 

605 | 

606 preliminary_initial_update_alert.s( 

607 superevent_id, 

608 ('earlywarning' if 'EARLY_WARNING' in event['labels'] 

609 else 'preliminary'), 

610 labels=event['labels']) 

611 ) 

612 

613 canvas.apply_async(priority=priority) 

614 

615 

616@gracedb.task(shared=False) 

617def _get_lowest_far(superevent_id): 

618 """Obtain the lowest FAR of the events in the target superevent.""" 

619 # FIXME: remove ._orig_run when this bug is fixed: 

620 # https://github.com/getsentry/sentry-python/issues/370 

621 return min(gracedb.get_event._orig_run(gid)['far'] for gid in 

622 gracedb.get_superevent._orig_run(superevent_id)["gw_events"]) 

623 

624 

625@app.task(ignore_result=True, shared=False) 

626def parameter_estimation(far_event, superevent_id): 

627 """Tasks for Parameter Estimation Followup with LALInference or Bilby 

628 

629 For LALInference, this consists of the following steps: 

630 

631 1. Prepare and upload an ini file which is suitable for the target event. 

632 2. Start Parameter Estimation if FAR is smaller than the PE threshold. 

633 

634 For Bilby, this consists of the following steps: 

635 

636 1. Start Parameter Estimation if FAR is smaller than the PE threshold. 

637 2. Upload of ini file during Parameter Estimation 

638 """ 

639 far, event = far_event 

640 preferred_event_id = event['graceid'] 

641 threshold = (app.conf['preliminary_alert_far_threshold']['cbc'] / 

642 app.conf['preliminary_alert_trials_factor']['cbc']) 

643 # FIXME: it will be better to start parameter estimation for 'burst' 

644 # events. 

645 is_production = (app.conf['gracedb_host'] == 'gracedb.ligo.org') 

646 is_mdc = (event['search'] == 'MDC') 

647 if event['group'] == 'CBC' and not (is_production and is_mdc): 

648 canvas = inference.pre_pe_tasks(event, superevent_id) 

649 if far <= threshold: 

650 pipelines = ['lalinference'] 

651 # FIXME: The second condition guarantees that the bilby for 

652 # playground or test events are started less than once per day to 

653 # save computational resources. Once bilby becomes quick enough, we 

654 # should drop that condition. 

655 if is_production or (is_mdc and superevent_id[8:] == 'a'): 

656 pipelines.append('bilby') 

657 canvas |= group( 

658 inference.start_pe.s(preferred_event_id, superevent_id, p) 

659 for p in pipelines) 

660 else: 

661 canvas |= gracedb.upload.si( 

662 filecontents=None, filename=None, 

663 graceid=superevent_id, 

664 message='FAR is larger than the PE threshold, ' 

665 '{} Hz. Parameter Estimation will not ' 

666 'start.'.format(threshold), 

667 tags='pe' 

668 ) 

669 

670 canvas.apply_async() 

671 

672 

673@gracedb.task(ignore_result=True, shared=False) 

674def preliminary_initial_update_alert(filenames, superevent_id, alert_type, 

675 labels=[]): 

676 """ 

677 Create and send a preliminary, initial, or update GCN notice. 

678 

679 Parameters 

680 ---------- 

681 filenames : tuple 

682 A list of the sky map, em_bright, and p_astro filenames. 

683 superevent_id : str 

684 The superevent ID. 

685 alert_type : {'earlywarning', 'preliminary', 'initial', 'update'} 

686 The alert type. 

687 labels : list 

688 A list of labels applied to superevent. 

689 

690 Notes 

691 ----- 

692 This function is decorated with :obj:`gwcelery.tasks.gracedb.task` rather 

693 than :obj:`gwcelery.app.task` so that a synchronous call to 

694 :func:`gwcelery.tasks.gracedb.get_log` is retried in the event of GraceDB 

695 API failures. If `EM_COINC` is in labels will create a RAVEN circular. 

696 

697 """ 

698 if filenames is None: 

699 return 

700 

701 if 'INJ' in labels: 

702 return 

703 

704 skymap_filename, em_bright_filename, p_astro_filename = filenames 

705 skymap_needed = (skymap_filename is None) 

706 em_bright_needed = (em_bright_filename is None) 

707 p_astro_needed = (p_astro_filename is None) 

708 if skymap_needed or em_bright_needed or p_astro_needed: 

709 for message in gracedb.get_log(superevent_id): 

710 t = message['tag_names'] 

711 f = message['filename'] 

712 v = message['file_version'] 

713 fv = '{},{}'.format(f, v) 

714 if not f: 

715 continue 

716 if skymap_needed \ 

717 and {'sky_loc', 'public'}.issubset(t) \ 

718 and f.endswith('.fits.gz'): 

719 skymap_filename = fv 

720 if em_bright_needed \ 

721 and 'em_bright' in t \ 

722 and f.endswith('.json'): 

723 em_bright_filename = fv 

724 if p_astro_needed \ 

725 and 'p_astro' in t \ 

726 and f.endswith('.json'): 

727 p_astro_filename = fv 

728 

729 if alert_type in {'earlywarning', 'preliminary', 'initial'}: 

730 if 'RAVEN_ALERT' in labels: 

731 circular_task = circulars.create_emcoinc_circular.si(superevent_id) 

732 circular_filename = '{}-emcoinc-circular.txt'.format(alert_type) 

733 tags = ['em_follow', 'ext_coinc'] 

734 

735 else: 

736 circular_task = circulars.create_initial_circular.si(superevent_id) 

737 circular_filename = '{}-circular.txt'.format(alert_type) 

738 tags = ['em_follow'] 

739 

740 circular_canvas = ( 

741 circular_task 

742 | 

743 gracedb.upload.s( 

744 circular_filename, 

745 superevent_id, 

746 'Template for {} GCN Circular'.format(alert_type), 

747 tags=tags) 

748 ) 

749 else: 

750 circular_canvas = identity.s() 

751 

752 canvas = ( 

753 group( 

754 gracedb.download.si(em_bright_filename, superevent_id), 

755 gracedb.download.si(p_astro_filename, superevent_id), 

756 gracedb.expose.s(superevent_id), 

757 *( 

758 gracedb.create_tag.s(filename, 'public', superevent_id) 

759 for filename in [ 

760 skymap_filename, em_bright_filename, p_astro_filename 

761 ] 

762 if filename is not None 

763 ) 

764 ) 

765 | 

766 _create_voevent.s( 

767 superevent_id, 

768 alert_type, 

769 skymap_filename=skymap_filename, 

770 internal=False, 

771 open_alert=True, 

772 raven_coinc=('RAVEN_ALERT' in labels) 

773 ) 

774 | 

775 group( 

776 gracedb.download.s(superevent_id) 

777 | 

778 gcn.send.s() 

779 | 

780 ( 

781 gracedb.create_label.si('GCN_PRELIM_SENT', superevent_id) 

782 if alert_type in {'earlywarning', 'preliminary'} 

783 else identity.si() 

784 ), 

785 

786 circular_canvas, 

787 

788 gracedb.create_tag.s('public', superevent_id) 

789 ) 

790 ) 

791 

792 canvas.apply_async() 

793 

794 

795@gracedb.task(ignore_result=True, shared=False) 

796def initial_alert(filenames, superevent_id, labels=[]): 

797 """Produce an initial alert. 

798 

799 This does nothing more than call 

800 :meth:`~gwcelery.tasks.orchestrator.preliminary_initial_update_alert` with 

801 ``alert_type='initial'``. 

802 

803 Parameters 

804 ---------- 

805 filenames : tuple 

806 A list of the sky map, em_bright, and p_astro filenames. 

807 superevent_id : str 

808 The superevent ID. 

809 labels : list 

810 A list of labels applied to superevent. 

811 

812 Notes 

813 ----- 

814 This function is decorated with :obj:`gwcelery.tasks.gracedb.task` rather 

815 than :obj:`gwcelery.app.task` so that a synchronous call to 

816 :func:`gwcelery.tasks.gracedb.get_log` is retried in the event of GraceDB 

817 API failures. 

818 

819 """ 

820 preliminary_initial_update_alert(filenames, superevent_id, 'initial', 

821 labels=labels) 

822 

823 

824@gracedb.task(ignore_result=True, shared=False) 

825def update_alert(filenames, superevent_id): 

826 """Produce an update alert. 

827 

828 This does nothing more than call 

829 :meth:`~gwcelery.tasks.orchestrator.preliminary_initial_update_alert` with 

830 ``alert_type='update'``. 

831 

832 Parameters 

833 ---------- 

834 filenames : tuple 

835 A list of the sky map, em_bright, and p_astro filenames. 

836 superevent_id : str 

837 The superevent ID. 

838 

839 Notes 

840 ----- 

841 This function is decorated with :obj:`gwcelery.tasks.gracedb.task` rather 

842 than :obj:`gwcelery.app.task` so that a synchronous call to 

843 :func:`gwcelery.tasks.gracedb.get_log` is retried in the event of GraceDB 

844 API failures. 

845 

846 """ 

847 preliminary_initial_update_alert(filenames, superevent_id, 'update') 

848 

849 

850@app.task(ignore_result=True, shared=False) 

851def retraction_alert(superevent_id): 

852 """Produce a retraction alert.""" 

853 ( 

854 gracedb.expose.si(superevent_id) 

855 | 

856 _create_voevent.si( 

857 None, superevent_id, 'retraction', 

858 internal=False, 

859 open_alert=True 

860 ) 

861 | 

862 group( 

863 gracedb.download.s(superevent_id) 

864 | 

865 gcn.send.s(), 

866 

867 circulars.create_retraction_circular.si(superevent_id) 

868 | 

869 gracedb.upload.s( 

870 'retraction-circular.txt', 

871 superevent_id, 

872 'Template for retraction GCN Circular', 

873 tags=['em_follow'] 

874 ), 

875 

876 gracedb.create_tag.s('public', superevent_id) 

877 ) 

878 ).apply_async()