Coverage for gwcelery/tasks/orchestrator.py: 94%

395 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-23 18:01 +0000

1"""Tasks that comprise the alert orchestrator. 

2 

3The orchestrator is responsible for the vetting and annotation workflow to 

4produce preliminary, initial, and update alerts for gravitational-wave event 

5candidates. 

6""" 

7import json 

8import re 

9 

10from astropy.time import Time 

11from celery import chain, group 

12from ligo.rrt_chat import channel_creation 

13from rapidpe_rift_pipe import pastro as rpe_pastro 

14 

15from .. import app 

16from . import (alerts, bayestar, circulars, detchar, em_bright, 

17 external_skymaps, gcn, gracedb, igwn_alert, inference, p_astro, 

18 rrt_utils, skymaps, superevents) 

19from .core import get_first, identity 

20 

21 

22@igwn_alert.handler('superevent', 

23 'mdc_superevent', 

24 shared=False) 

25def handle_superevent(alert): 

26 """Schedule annotations for new superevents. 

27 

28 After waiting for a time specified by the 

29 :obj:`~gwcelery.conf.orchestrator_timeout` configuration variable for the 

30 choice of preferred event to settle down, this task performs data quality 

31 checks with :meth:`gwcelery.tasks.detchar.check_vectors` and calls 

32 :meth:`~gwcelery.tasks.orchestrator.earlywarning_preliminary_alert` to send 

33 a preliminary notice. 

34 """ 

35 superevent_id = alert['uid'] 

36 # launch PE and detchar based on new type superevents 

37 if alert['alert_type'] == 'new': 

38 # launching rapidpe 30s after merger. 

39 timeout = max( 

40 alert['object']['t_0'] - Time.now().gps + 

41 app.conf['rapidpe_timeout'], 0 

42 ) 

43 ( 

44 _get_preferred_event.si(superevent_id).set( 

45 countdown=timeout 

46 ) 

47 | 

48 group( 

49 _get_cbc_lowest_far.si(superevent_id), 

50 gracedb.get_event.s() 

51 ) 

52 | 

53 parameter_estimation.s(superevent_id, 'rapidpe') 

54 ).apply_async() 

55 

56 ( 

57 _get_preferred_event.si(superevent_id).set( 

58 countdown=app.conf['pe_timeout'] 

59 ) 

60 | 

61 group( 

62 _get_cbc_lowest_far.si(superevent_id), 

63 gracedb.get_event.s() 

64 ) 

65 | 

66 parameter_estimation.s(superevent_id, 'bilby') 

67 ).apply_async() 

68 

69 # run check_vectors. Create and upload omegascans 

70 group( 

71 detchar.omegascan.si(alert['object']['t_0'], superevent_id), 

72 

73 detchar.check_vectors.si( 

74 alert['object']['preferred_event_data'], 

75 superevent_id, 

76 alert['object']['t_start'], 

77 alert['object']['t_end'] 

78 ) 

79 ).delay() 

80 

81 elif alert['alert_type'] == 'label_added': 

82 label_name = alert['data']['name'] 

83 query = f'superevent: {superevent_id} group: CBC Burst' 

84 if alert['object']['category'] == 'MDC': 

85 query += ' MDC' 

86 elif alert['object']['category'] == 'Test': 

87 query += ' Test' 

88 

89 # launch less-significant preliminary alerts on LOW_SIGNIF_LOCKED 

90 if label_name == superevents.FROZEN_LABEL: 

91 # don't launch if EARLY_WARNING or SIGNIF_LOCKED is present 

92 skipping_labels = { 

93 superevents.SIGNIFICANT_LABEL, 

94 superevents.EARLY_WARNING_LABEL 

95 }.intersection(alert['object']['labels']) 

96 if skipping_labels: 

97 gracedb.upload.delay( 

98 None, None, superevent_id, 

99 "The superevent already has a significant/EW event, " 

100 "skipping launching less-significant alert" 

101 ) 

102 return 

103 ( 

104 gracedb.upload.s( 

105 None, 

106 None, 

107 superevent_id, 

108 "Automated DQ check before sending less-significant " 

109 "preliminary alert. New results supersede old results.", 

110 tags=['data_quality'] 

111 ) 

112 | 

113 detchar.check_vectors.si( 

114 alert['object']['preferred_event_data'], 

115 superevent_id, 

116 alert['object']['t_start'], 

117 alert['object']['t_end'] 

118 ) 

119 | 

120 earlywarning_preliminary_alert.s( 

121 alert, alert_type='less-significant') 

122 ).apply_async() 

123 

124 # launch significant alert on SIGNIF_LOCKED 

125 elif label_name == superevents.SIGNIFICANT_LABEL: 

126 # ensure superevent is locked before starting alert workflow 

127 r = chain() 

128 if superevents.FROZEN_LABEL not in alert['object']['labels']: 

129 r |= gracedb.create_label.si(superevents.FROZEN_LABEL, 

130 superevent_id) 

131 

132 r |= ( 

133 gracedb.get_events.si(query) 

134 | 

135 superevents.select_preferred_event.s() 

136 | 

137 _update_superevent_and_return_event_dict.s(superevent_id) 

138 | 

139 _leave_log_message_and_return_event_dict.s( 

140 superevent_id, 

141 "Superevent cleaned up after significant event. " 

142 ) 

143 | 

144 _leave_log_message_and_return_event_dict.s( 

145 superevent_id, 

146 "Automated DQ check before sending significant alert. " 

147 "New results supersede old results.", 

148 tags=['data_quality'] 

149 ) 

150 | 

151 detchar.check_vectors.s( 

152 superevent_id, 

153 alert['object']['t_start'], 

154 alert['object']['t_end'] 

155 ) 

156 | 

157 earlywarning_preliminary_alert.s( 

158 alert, alert_type='preliminary') 

159 ) 

160 r.apply_async() 

161 

162 # launch second preliminary on GCN_PRELIM_SENT 

163 elif label_name == 'GCN_PRELIM_SENT': 

164 ( 

165 identity.si().set( 

166 # https://git.ligo.org/emfollow/gwcelery/-/issues/478 

167 # FIXME: remove this task once https://github.com/celery/celery/issues/7851 is resolved # noqa: E501 

168 countdown=app.conf['superevent_clean_up_timeout'] 

169 ) 

170 | 

171 gracedb.get_events.si(query) 

172 | 

173 superevents.select_preferred_event.s() 

174 | 

175 _update_superevent_and_return_event_dict.s(superevent_id) 

176 | 

177 group( 

178 _leave_log_message_and_return_event_dict.s( 

179 superevent_id, 

180 "Superevent cleaned up after first preliminary alert" 

181 ), 

182 

183 gracedb.create_label.si('DQR_REQUEST', superevent_id) 

184 ) 

185 | 

186 get_first.s() 

187 | 

188 earlywarning_preliminary_alert.s( 

189 alert, alert_type='preliminary') 

190 ).apply_async() 

191 

192 # set pipeline preferred events 

193 # FIXME: Ideally this should combined with the previous canvas. 

194 # However, incorporating that group prevents canvas from executing 

195 # maybe related to https://github.com/celery/celery/issues/7851 

196 ( 

197 gracedb.get_events.si(query) 

198 | 

199 superevents.select_pipeline_preferred_event.s() 

200 | 

201 _set_pipeline_preferred_events.s(superevent_id) 

202 ).apply_async(countdown=app.conf['superevent_clean_up_timeout']) 

203 

204 elif label_name == 'LOW_SIGNIF_PRELIM_SENT': 

205 # similar workflow as the GCN_PRELIM_SENT 

206 # except block by condition evaluated at the end of timeout 

207 _revise_and_send_second_less_significant_alert.si( 

208 alert, query, superevent_id, 

209 ).apply_async(countdown=app.conf['superevent_clean_up_timeout']) 

210 

211 elif label_name == superevents.EARLY_WARNING_LABEL: 

212 if superevents.SIGNIFICANT_LABEL in alert['object']['labels']: 

213 # stop if full BW significant event already present 

214 gracedb.upload.delay( 

215 None, None, superevent_id, 

216 "Superevent superseded by full BW event, skipping EW." 

217 ) 

218 return 

219 # start the EW alert pipeline; is blocked by SIGNIF_LOCKED 

220 # ensure superevent is locked before starting pipeline 

221 r = chain() 

222 if superevents.FROZEN_LABEL not in alert['object']['labels']: 

223 r |= gracedb.create_label.si(superevents.FROZEN_LABEL, 

224 superevent_id) 

225 

226 r |= ( 

227 gracedb.get_events.si(query) 

228 | 

229 superevents.select_preferred_event.s() 

230 | 

231 _update_superevent_and_return_event_dict.s(superevent_id) 

232 | 

233 _leave_log_message_and_return_event_dict.s( 

234 superevent_id, 

235 "Superevent cleaned up before sending EW alert." 

236 ) 

237 | 

238 earlywarning_preliminary_alert.s( 

239 alert, alert_type='earlywarning') 

240 ) 

241 r.apply_async() 

242 

243 # launch initial/retraction alert on ADVOK/ADVNO 

244 elif label_name == 'ADVOK': 

245 initial_alert((None, None, None), alert) 

246 elif label_name == 'ADVNO': 

247 retraction_alert(alert) 

248 elif label_name == 'ADVREQ': 

249 if app.conf['create_mattermost_channel']: 

250 _create_mattermost_channel.si(superevent_id).delay() 

251 

252 # check DQV label on superevent, run check_vectors if required 

253 elif alert['alert_type'] == 'event_added': 

254 # FIXME Check if this should be changed to 'update' alert_types instead 

255 # of 'event_added'. 'event_added' seems like it's just a new event in 

256 # the superevent window, not necessarily an event that should be 

257 # promoted to preferred event 

258 start = alert['data']['t_start'] 

259 end = alert['data']['t_end'] 

260 

261 if 'DQV' in gracedb.get_labels(superevent_id): 

262 ( 

263 detchar.check_vectors.s( 

264 alert['object']['preferred_event_data'], 

265 superevent_id, 

266 start, 

267 end 

268 ) 

269 | 

270 _update_if_dqok.s(superevent_id) 

271 ).apply_async() 

272 

273 

274@igwn_alert.handler('cbc_gstlal', 

275 'cbc_spiir', 

276 'cbc_pycbc', 

277 'cbc_mbta', 

278 shared=False) 

279def handle_cbc_event(alert): 

280 """Perform annotations for CBC events that depend on pipeline-specific 

281 matched-filter parameter estimates. 

282 

283 Notes 

284 ----- 

285 This IGWN alert message handler is triggered by a new upload or by updates 

286 that include the file ``pipeline.p_astro.json``. If also generates 

287 pipeline.p_astro.json information for pipelines that do not provide 

288 such information. 

289 

290 The table below lists which files are created as a result of a new upload, 

291 and which tasks generate them. 

292 

293 ============================== ================================================== 

294 File Task 

295 ============================== ================================================== 

296 ``bayestar.multiorder.fits`` :meth:`gwcelery.tasks.bayestar.localize` 

297 ``em_bright.json`` :meth:`gwcelery.tasks.em_bright.source_properties` 

298 ``pipeline.p_astro.json`` :meth:`gwcelery.tasks.p_astro.compute_p_astro` 

299 ============================== ================================================== 

300 

301 """ # noqa: E501 

302 graceid = alert['uid'] 

303 pipeline = alert['object']['pipeline'].lower() 

304 search = alert['object']['search'].lower() 

305 

306 # no annotations for events used in VT analysis 

307 if search == superevents.VT_SEARCH_NAME.lower(): 

308 return 

309 

310 priority = 0 if superevents.should_publish(alert['object']) else 1 

311 

312 # Pipelines that use the GWCelery p-astro method 

313 # - spiir (all searches) 

314 # - pycbc for EarlyWarning search 

315 # - periodic MDC generated by first-two-years (based on gstlal) 

316 # FIXME: Remove this once all pipelines compute their own p-astro 

317 pipelines_stock_p_astro = {('spiir', 'earlywarning'), 

318 ('pycbc', 'earlywarning'), 

319 ('gstlal', 'mdc')} 

320 

321 # em_bright and p_astro calculation 

322 if alert['alert_type'] == 'new': 

323 instruments = superevents.get_instruments_in_ranking_statistic( 

324 alert['object']) 

325 extra_attributes = alert['object']['extra_attributes'] 

326 snr = superevents.get_snr(alert['object']) 

327 far = alert['object']['far'] 

328 mass1 = extra_attributes['SingleInspiral'][0]['mass1'] 

329 mass2 = extra_attributes['SingleInspiral'][0]['mass2'] 

330 chi1 = extra_attributes['SingleInspiral'][0]['spin1z'] 

331 chi2 = extra_attributes['SingleInspiral'][0]['spin2z'] 

332 

333 # FIXME: remove conditional when em-bright implementation exists 

334 searches_without_em_bright = [superevents.SUBSOLAR_SEARCH_NAME.lower()] 

335 if search not in searches_without_em_bright: 

336 ( 

337 em_bright.source_properties.si(mass1, mass2, chi1, chi2, snr) 

338 | 

339 gracedb.upload.s( 

340 'em_bright.json', graceid, 

341 'em bright complete', ['em_bright', 'public'] 

342 ) 

343 | 

344 gracedb.create_label.si('EMBRIGHT_READY', graceid) 

345 ).apply_async(priority=priority) 

346 

347 # p_astro calculation for pipelines that does not provide a 

348 # stock p_astro (upload pipeline.p_astro.json) 

349 if (pipeline, search) in pipelines_stock_p_astro: 

350 ( 

351 p_astro.compute_p_astro.s(snr, 

352 far, 

353 mass1, 

354 mass2, 

355 pipeline, 

356 instruments) 

357 | 

358 gracedb.upload.s( 

359 f'{pipeline}.p_astro.json', graceid, 

360 'p_astro computation complete', ['p_astro', 'public'] 

361 ) 

362 | 

363 gracedb.create_label.si('PASTRO_READY', graceid) 

364 ).apply_async(priority=priority) 

365 

366 # Start BAYESTAR for all CBC pipelines. 

367 ( 

368 gracedb.download.s('coinc.xml', graceid) 

369 | 

370 bayestar.localize.s(graceid) 

371 | 

372 gracedb.upload.s( 

373 'bayestar.multiorder.fits', graceid, 

374 'sky localization complete', ['sky_loc', 'public'] 

375 ) 

376 | 

377 gracedb.create_label.si('SKYMAP_READY', graceid) 

378 ).apply_async(priority=priority) 

379 

380 

381@igwn_alert.handler('burst_olib', 

382 'burst_cwb', 

383 'burst_mly', 

384 shared=False) 

385def handle_burst_event(alert): 

386 """Perform annotations for burst events that depend on pipeline-specific 

387 """ # noqa: E501 

388 graceid = alert['uid'] 

389 pipeline = alert['object']['pipeline'].lower() 

390 search = alert['object']['search'].lower() 

391 priority = 0 if superevents.should_publish(alert['object']) else 1 

392 

393 # em_bright calculation for Burst-cWB-BBH 

394 if alert['alert_type'] == 'new': 

395 if (pipeline, search) in [('cwb', 'bbh')]: 

396 extra_attributes = alert['object']['extra_attributes'] 

397 multiburst = extra_attributes['MultiBurst'] 

398 snr = multiburst.get('snr') 

399 mchirp = multiburst.get('mchirp', 0.0) 

400 # FIXME once ingestion of mchip is finalised 

401 # In case mchirp is not there or zero 

402 # produce em_brigth with all zero 

403 if mchirp == 0.0: 

404 m12 = 30.0 

405 else: 

406 m12 = 2**(0.2) * mchirp 

407 ( 

408 em_bright.source_properties.si(m12, m12, 0.0, 0.0, snr) 

409 | 

410 gracedb.upload.s( 

411 'em_bright.json', graceid, 

412 'em bright complete', ['em_bright', 'public'] 

413 ) 

414 | 

415 gracedb.create_label.si('EMBRIGHT_READY', graceid) 

416 ).apply_async(priority=priority) 

417 

418 if alert['alert_type'] != 'log': 

419 return 

420 

421 filename = alert['data']['filename'] 

422 

423 # Pipeline is uploading a flat resultion skymap file 

424 # Converting to a multiirder ones with proper name. 

425 # FIXME: Remove block when CWB starts to upload skymaps 

426 # in multiorder format 

427 if filename.endswith('.fits.gz'): 

428 new_filename = filename.replace('.fits.gz', '.multiorder.fits') 

429 flatten_msg = ( 

430 'Multi-resolution FITS file created from ' 

431 '<a href="/api/events/{graceid}/files/' 

432 '{filename}">{filename}</a>').format( 

433 graceid=graceid, filename=filename) 

434 tags = ['sky_loc', 'lvem', 'public'] 

435 ( 

436 gracedb.download.si(filename, graceid) 

437 | 

438 skymaps.unflatten.s(new_filename) 

439 | 

440 gracedb.upload.s( 

441 new_filename, graceid, flatten_msg, tags) 

442 | 

443 gracedb.create_label.si('SKYMAP_READY', graceid) 

444 ).apply_async(priority=priority) 

445 

446 

447@igwn_alert.handler('superevent', 

448 'mdc_superevent', 

449 shared=False) 

450def handle_posterior_samples(alert): 

451 """Generate multi-resolution and flat-resolution FITS files and skymaps 

452 from an uploaded HDF5 file containing posterior samples. 

453 """ 

454 if alert['alert_type'] != 'log' or \ 

455 not alert['data']['filename'].endswith('.posterior_samples.hdf5'): 

456 return 

457 superevent_id = alert['uid'] 

458 filename = alert['data']['filename'] 

459 info = '{} {}'.format(alert['data']['comment'], filename) 

460 prefix, _ = filename.rsplit('.posterior_samples.') 

461 skymap_filename = f'{prefix}.multiorder.fits' 

462 labels = ['pe', 'sky_loc'] 

463 ifos = superevents.get_instruments(alert['object']['preferred_event_data']) 

464 

465 ( 

466 gracedb.download.si(filename, superevent_id) 

467 | 

468 skymaps.skymap_from_samples.s(superevent_id, ifos) 

469 | 

470 group( 

471 skymaps.annotate_fits.s( 

472 skymap_filename, superevent_id, labels 

473 ), 

474 

475 gracedb.upload.s( 

476 skymap_filename, superevent_id, 

477 'Multiresolution FITS file generated from "{}"'.format(info), 

478 labels 

479 ) 

480 ) 

481 ).delay() 

482 

483 # em_bright from LALInference posterior samples 

484 ( 

485 gracedb.download.si(filename, superevent_id) 

486 | 

487 em_bright.em_bright_posterior_samples.s() 

488 | 

489 gracedb.upload.s( 

490 '{}.em_bright.json'.format(prefix), superevent_id, 

491 'em-bright computed from "{}"'.format(info), 

492 'pe' 

493 ) 

494 ).delay() 

495 

496 

497@app.task(bind=True, shared=False) 

498def _create_mattermost_channel(self, superevent_id): 

499 """ 

500 Creates a mattermost channel when ADVREQ label is applied and 

501 posts a cooresponding gracedb link of that event in the channel 

502 

503 Channel name : O4 RRT {superevent_id} 

504 

505 Parameters: 

506 ------------ 

507 superevent_id: str 

508 The superevent id 

509 """ 

510 gracedb_url = self.app.conf['gracedb_host'] 

511 channel_creation.rrt_channel_creation( 

512 superevent_id, gracedb_url) 

513 

514 

515@app.task(shared=False) 

516def _set_pipeline_preferred_events(pipeline_event, superevent_id): 

517 """Return group for setting pipeline preferred event using 

518 :meth:`gracedb.add_pipeline_preferred_event`. 

519 

520 Parameters 

521 ---------- 

522 pipeline_event: dict 

523 {pipeline: event_dict} key value pairs, returned by 

524 :meth:`superevents.select_pipeline_preferred_event`. 

525 

526 superevent_id: str 

527 The superevent id 

528 """ 

529 return group( 

530 gracedb.add_pipeline_preferred_event(superevent_id, 

531 event['graceid']) 

532 for event in pipeline_event.values() 

533 ) 

534 

535 

536@app.task(shared=False, ignore_result=True) 

537def _update_if_dqok(event, superevent_id): 

538 """Update `preferred_event` of `superevent_id` to `event_id` if `DQOK` 

539 label has been applied. 

540 """ 

541 if 'DQOK' in gracedb.get_labels(superevent_id): 

542 event_id = event['graceid'] 

543 gracedb.update_superevent(superevent_id, 

544 preferred_event=event_id, 

545 t_0=event["gpstime"]) 

546 gracedb.upload.delay( 

547 None, None, superevent_id, 

548 comment=f'DQOK applied based on new event {event_id}') 

549 

550 

551@gracedb.task(shared=False) 

552def _get_preferred_event(superevent_id): 

553 """Determine preferred event for a superevent by querying GraceDB. 

554 

555 This works just like :func:`gwcelery.tasks.gracedb.get_superevent`, except 

556 that it returns only the preferred event, and not the entire GraceDB JSON 

557 response. 

558 """ 

559 # FIXME: remove ._orig_run when this bug is fixed: 

560 # https://github.com/getsentry/sentry-python/issues/370 

561 return gracedb.get_superevent._orig_run(superevent_id)['preferred_event'] 

562 

563 

564@gracedb.task(shared=False) 

565def _create_voevent(classification, *args, **kwargs): 

566 r"""Create a VOEvent record from an EM bright JSON file. 

567 

568 Parameters 

569 ---------- 

570 classification : tuple, None 

571 A collection of JSON strings, generated by 

572 :meth:`gwcelery.tasks.em_bright.source_properties` and 

573 :meth:`gwcelery.tasks.p_astro.compute_p_astro` or 

574 content of ``{gstlal,mbta}.p_astro.json`` uploaded 

575 by {gstlal,mbta} respectively; or None 

576 \*args 

577 Additional positional arguments passed to 

578 :meth:`gwcelery.tasks.gracedb.create_voevent`. 

579 \*\*kwargs 

580 Additional keyword arguments passed to 

581 :meth:`gwcelery.tasks.gracedb.create_voevent`. 

582 

583 Returns 

584 ------- 

585 str 

586 The filename of the newly created VOEvent. 

587 

588 """ 

589 kwargs = dict(kwargs) 

590 

591 if classification is not None: 

592 # Merge source classification and source properties into kwargs. 

593 for text in classification: 

594 # Ignore filenames, only load dict in bytes form 

595 if text is not None: 

596 kwargs.update(json.loads(text)) 

597 

598 # FIXME: These keys have differ between em_bright.json 

599 # and the GraceDB REST API. 

600 try: 

601 kwargs['ProbHasNS'] = kwargs.pop('HasNS') 

602 except KeyError: 

603 pass 

604 

605 try: 

606 kwargs['ProbHasRemnant'] = kwargs.pop('HasRemnant') 

607 except KeyError: 

608 pass 

609 

610 skymap_filename = kwargs.get('skymap_filename') 

611 if skymap_filename is not None: 

612 skymap_type = re.sub( 

613 r'(\.multiorder)?\.fits(\..+)?(,[0-9]+)?$', '', skymap_filename) 

614 kwargs.setdefault('skymap_type', skymap_type) 

615 

616 # FIXME: remove ._orig_run when this bug is fixed: 

617 # https://github.com/getsentry/sentry-python/issues/370 

618 return gracedb.create_voevent._orig_run(*args, **kwargs) 

619 

620 

621@app.task(shared=False) 

622def _create_label_and_return_filename(filename, label, graceid): 

623 gracedb.create_label.delay(label, graceid) 

624 return filename 

625 

626 

627@app.task(shared=False) 

628def _leave_log_message_and_return_event_dict(event, superevent_id, 

629 message, **kwargs): 

630 """Wrapper around :meth:`gracedb.upload` 

631 that returns the event dictionary. 

632 """ 

633 gracedb.upload.delay(None, None, superevent_id, message, **kwargs) 

634 return event 

635 

636 

637@gracedb.task(shared=False) 

638def _update_superevent_and_return_event_dict(event, superevent_id): 

639 """Wrapper around :meth:`gracedb.update_superevent` 

640 that returns the event dictionary. 

641 """ 

642 gracedb.update_superevent(superevent_id, 

643 preferred_event=event['graceid'], 

644 t_0=event['gpstime']) 

645 return event 

646 

647 

648@gracedb.task(shared=False) 

649def _proceed_if_not_blocked_by(files, superevent_id, block_by): 

650 """Return files in case the superevent does not have labels `block_by` 

651 

652 Parameters 

653 ---------- 

654 files : tuple 

655 List of files 

656 superevent_id : str 

657 The superevent id corresponding to files 

658 block_by : set 

659 Set of blocking labels. E.g. `{'ADVOK', 'ADVNO'}` 

660 """ 

661 superevent_labels = gracedb.get_labels(superevent_id) 

662 blocking_labels = block_by.intersection(superevent_labels) 

663 if blocking_labels: 

664 gracedb.upload.delay( 

665 None, None, superevent_id, 

666 f"Blocking automated notice due to labels {blocking_labels}" 

667 ) 

668 return None 

669 else: 

670 return files 

671 

672 

673@gracedb.task(shared=False) 

674def _revise_and_send_second_less_significant_alert(alert, query, 

675 superevent_id): 

676 superevent_labels = gracedb.get_labels(superevent_id) 

677 blocking_labels = { 

678 'ADVREQ', 'ADVOK', 'ADVNO', 

679 superevents.SIGNIFICANT_LABEL, 

680 superevents.EARLY_WARNING_LABEL, 

681 } 

682 if blocking_labels.intersection(superevent_labels): 

683 return 

684 

685 ( 

686 gracedb.get_events.si(query) 

687 | 

688 superevents.select_preferred_event.s() 

689 | 

690 _update_superevent_and_return_event_dict.s(superevent_id) 

691 | 

692 _leave_log_message_and_return_event_dict.s( 

693 superevent_id, 

694 "Superevent cleaned up before second less-significant alert" 

695 ) 

696 | 

697 earlywarning_preliminary_alert.s( 

698 alert, alert_type='less-significant') 

699 ).delay() 

700 

701 # set pipeline preferred events 

702 # FIXME: Ideally this should combined with the previous canvas. 

703 # However, incorporating that group prevents canvas from executing 

704 # maybe related to https://github.com/celery/celery/issues/7851 

705 ( 

706 gracedb.get_events.si(query) 

707 | 

708 superevents.select_pipeline_preferred_event.s() 

709 | 

710 _set_pipeline_preferred_events.s(superevent_id) 

711 ).delay() 

712 

713 

714@app.task(shared=False) 

715def _annotate_fits_and_return_input(input_list, superevent_id): 

716 """Unpack the output of the skymap, embright, p-astro download group in the 

717 beginning of the 

718 :meth:`~gwcelery.tasks.orchestartor.earlywarning_preliminary_alert` canvas 

719 and call :meth:`~gwcelery.tasks.skymaps.annotate_fits`. 

720 

721 

722 Parameters 

723 ---------- 

724 input_list : list 

725 The output of the group that downloads the skymap, embright, and 

726 p-astro files. This list is in the form [skymap, skymap_filename], 

727 [em_bright, em_bright_filename], [p_astro_dict, p_astro_filename], 

728 though the em-bright and p-astro lists can be populated by Nones 

729 superevent_id : str 

730 A list of the sky map, em_bright, and p_astro filenames. 

731 """ 

732 

733 skymaps.annotate_fits_tuple( 

734 input_list[0], 

735 superevent_id, 

736 ['sky_loc', 'public'] 

737 ) 

738 

739 return input_list 

740 

741 

742@gracedb.task(shared=False) 

743def _unpack_args_and_send_earlywarning_preliminary_alert(input_list, alert, 

744 alert_type): 

745 """Unpack the output of the skymap, embright, p-astro download group in the 

746 beginning of the 

747 :meth:`~gwcelery.tasks.orchestartor.earlywarning_preliminary_alert` canvas 

748 and call 

749 :meth:`gwcelery.tasks.orchestrator.earlywarning_preliminary_initial_update_alert`. 

750 

751 

752 Parameters 

753 ---------- 

754 input_list : list 

755 The output of the group that downloads the skymap, embright, and 

756 p-astro files. This list is in the form [skymap, skymap_filename], 

757 [em_bright, em_bright_filename], [p_astro_dict, p_astro_filename], 

758 though the em-bright and p-astro lists can be populated by Nones 

759 alert : dict 

760 IGWN-Alert dictionary 

761 alert_type : str 

762 alert_type passed to 

763 :meth:`earlywarning_preliminary_initial_update_alert` 

764 """ 

765 if input_list is None: # alert is blocked by blocking labels 

766 return 

767 

768 [skymap, skymap_filename], [em_bright, em_bright_filename], \ 

769 [p_astro_dict, p_astro_filename] = input_list 

770 

771 # Update to latest state after downloading files 

772 superevent = gracedb.get_superevent(alert['object']['superevent_id']) 

773 

774 earlywarning_preliminary_initial_update_alert.delay( 

775 [skymap_filename, em_bright_filename, p_astro_filename], 

776 superevent, alert_type, 

777 filecontents=[skymap, em_bright, p_astro_dict] 

778 ) 

779 

780 

781@app.task(ignore_result=True, shared=False) 

782def earlywarning_preliminary_alert(event, alert, alert_type='preliminary', 

783 initiate_voevent=True): 

784 """Produce a preliminary alert by copying any sky maps. 

785 

786 This consists of the following steps: 

787 

788 1. Copy any sky maps and source classification from the preferred event 

789 to the superevent. 

790 2. Create standard annotations for sky maps including all-sky plots by 

791 calling :meth:`gwcelery.tasks.skymaps.annotate_fits`. 

792 3. Create a preliminary VOEvent. 

793 4. Send the VOEvent to GCN and notices to SCiMMA and GCN. 

794 5. Apply the GCN_PRELIM_SENT or LOW_SIGNIF_PRELIM_SENT 

795 depending on the significant or less-significant alert 

796 respectively. 

797 6. Create and upload a GCN Circular draft. 

798 """ 

799 priority = 0 if superevents.should_publish(event) else 1 

800 preferred_event_id = event['graceid'] 

801 superevent_id = alert['uid'] 

802 

803 # Define alert payloads depending on group-pipeline-search 

804 # cbc-*-* : p_astro/em_bright 

805 # burst.cwb-bbh : p_astro/em_bright 

806 # burst-*-* : NO p_astro/em_bright 

807 alert_group = event['group'].lower() 

808 alert_pipeline = event['pipeline'].lower() 

809 alert_search = event['search'].lower() 

810 if alert_pipeline == 'cwb' and alert_search == 'bbh': 

811 skymap_filename = alert_pipeline + '.multiorder.fits' 

812 p_astro_filename = alert_pipeline + '.p_astro.json' 

813 em_bright_filename = 'em_bright.json' 

814 elif alert_group == 'cbc': 

815 skymap_filename = 'bayestar.multiorder.fits' 

816 p_astro_filename = alert_pipeline + '.p_astro.json' 

817 em_bright_filename = 'em_bright.json' 

818 elif alert_group == 'burst': 

819 skymap_filename = event['pipeline'].lower() + '.multiorder.fits' 

820 p_astro_filename = None 

821 em_bright_filename = None 

822 else: 

823 raise NotImplementedError( 

824 'Valid skymap required for preliminary alert' 

825 ) 

826 

827 # Determine if the event should be made public. 

828 is_publishable = (superevents.should_publish( 

829 event, significant=alert_type != 'less-significant') and 

830 {'DQV', 'INJ'}.isdisjoint( 

831 event['labels'])) 

832 

833 # Download files from events and upload to superevent with relevant 

834 # annotations. Pass file contents down the chain so alerts task doesn't 

835 # need to download them again. 

836 # Note: this is explicitly made a chain to fix an issue described in #464. 

837 canvas = chain( 

838 group( 

839 gracedb.download.si(skymap_filename, preferred_event_id) 

840 | 

841 group( 

842 identity.s(), 

843 

844 gracedb.upload.s( 

845 skymap_filename, 

846 superevent_id, 

847 message='Localization copied from {}'.format( 

848 preferred_event_id), 

849 tags=['sky_loc', 'public'] 

850 ) 

851 | 

852 _create_label_and_return_filename.s('SKYMAP_READY', 

853 superevent_id) 

854 ), 

855 

856 ( 

857 gracedb.download.si(em_bright_filename, preferred_event_id) 

858 | 

859 group( 

860 identity.s(), 

861 

862 gracedb.upload.s( 

863 em_bright_filename, 

864 superevent_id, 

865 message='Source properties copied from {}'.format( 

866 preferred_event_id), 

867 tags=['em_bright', 'public'] 

868 ) 

869 | 

870 _create_label_and_return_filename.s('EMBRIGHT_READY', 

871 superevent_id) 

872 ) 

873 ) if em_bright_filename is not None else 

874 identity.s([None, None]), 

875 

876 ( 

877 gracedb.download.si(p_astro_filename, preferred_event_id) 

878 | 

879 group( 

880 identity.s(), 

881 

882 gracedb.upload.s( 

883 p_astro_filename, 

884 superevent_id, 

885 message='Source classification copied from {}'.format( 

886 preferred_event_id), 

887 tags=['p_astro', 'public'] 

888 ) 

889 | 

890 _create_label_and_return_filename.s('PASTRO_READY', 

891 superevent_id) 

892 ) 

893 ) if p_astro_filename is not None else 

894 identity.s([None, None]) 

895 ) 

896 | 

897 # Need annotate skymap task in body of chord instead of header because 

898 # this task simply calls another task, which is to be avoided in chord 

899 # headers. Note that any group that chains to a task is automatically 

900 # upgraded to a chord. 

901 _annotate_fits_and_return_input.s(superevent_id) 

902 ) 

903 

904 # Switch for disabling all but MDC alerts. 

905 if app.conf['only_alert_for_mdc']: 

906 if event.get('search') != 'MDC': 

907 canvas |= gracedb.upload.si( 

908 None, None, superevent_id, 

909 ("Skipping alert because gwcelery has been configured to only" 

910 " send alerts for MDC events.")) 

911 canvas.apply_async(priority=priority) 

912 return 

913 

914 # Send notice and upload GCN circular draft for online events. 

915 if is_publishable and initiate_voevent: 

916 # presence of advocate action blocks significant prelim alert 

917 # presence of adv action or significant event blocks EW alert 

918 # presence of adv action or significant event or EW event blocks 

919 # less significant alert 

920 blocking_labels = ( 

921 {'ADVOK', 'ADVNO'} if alert_type == 'preliminary' 

922 else 

923 {superevents.SIGNIFICANT_LABEL, 'ADVOK', 'ADVNO'} 

924 if alert_type == 'earlywarning' 

925 else 

926 {superevents.EARLY_WARNING_LABEL, superevents.SIGNIFICANT_LABEL, 

927 'ADVOK', 'ADVNO'} 

928 if alert_type == 'less-significant' 

929 else 

930 set() 

931 ) 

932 canvas |= ( 

933 _proceed_if_not_blocked_by.s(superevent_id, blocking_labels) 

934 | 

935 _unpack_args_and_send_earlywarning_preliminary_alert.s( 

936 alert, alert_type 

937 ) 

938 ) 

939 

940 canvas.apply_async(priority=priority) 

941 

942 

943@gracedb.task(shared=False) 

944def _get_cbc_lowest_far(superevent_id): 

945 """Obtain the lowest FAR of the CBC events in the target superevent.""" 

946 # FIXME: remove ._orig_run when this bug is fixed: 

947 # https://github.com/getsentry/sentry-python/issues/370 

948 events = [ 

949 gracedb.get_event._orig_run(gid) for gid in 

950 gracedb.get_superevent._orig_run(superevent_id)["gw_events"] 

951 ] 

952 return min( 

953 [e['far'] for e in events if e['group'].lower() == 'cbc'], 

954 default=None 

955 ) 

956 

957 

958@app.task(ignore_result=True, shared=False) 

959def parameter_estimation(far_event, superevent_id, pe_pipeline): 

960 """Parameter Estimation with Bilby and RapidPE-RIFT. Parameter estimation 

961 runs are triggered for CBC triggers which pass the FAR threshold and are 

962 not mock uploads. For those which do not pass these criteria, this task 

963 uploads messages explaining why parameter estimation is not started. 

964 """ 

965 far, event = far_event 

966 group = event['group'].lower() 

967 search = event['search'].lower() 

968 if search in app.conf['significant_alert_far_threshold']['cbc']: 

969 threshold = ( 

970 app.conf['significant_alert_far_threshold']['cbc'][search] / 

971 app.conf['significant_alert_trials_factor']['cbc'][search] 

972 ) 

973 else: 

974 # Fallback in case an event is uploaded to an unlisted search 

975 threshold = -1 * float('inf') 

976 if group != 'cbc': 

977 gracedb.upload.delay( 

978 filecontents=None, filename=None, 

979 graceid=superevent_id, 

980 message='Parameter estimation will not start since this is not ' 

981 'CBC but {}.'.format(event['group']), 

982 tags='pe' 

983 ) 

984 elif far > threshold: 

985 gracedb.upload.delay( 

986 filecontents=None, filename=None, 

987 graceid=superevent_id, 

988 message='Parameter estimation will not start since FAR is larger ' 

989 'than the PE threshold, {} Hz.'.format(threshold), 

990 tags='pe' 

991 ) 

992 elif search == 'mdc': 

993 gracedb.upload.delay( 

994 filecontents=None, filename=None, 

995 graceid=superevent_id, 

996 message='Parameter estimation will not start since parameter ' 

997 'estimation is disabled for mock uploads.', 

998 tags='pe' 

999 ) 

1000 elif event.get('offline', False): 

1001 gracedb.upload.delay( 

1002 filecontents=None, filename=None, 

1003 graceid=superevent_id, 

1004 message='Parameter estimation will not start since parameter ' 

1005 'estimation is disabled for OFFLINE events.', 

1006 tags='pe' 

1007 ) 

1008 elif ( 

1009 app.conf['gracedb_host'] == 'gracedb-playground.ligo.org' 

1010 and event['pipeline'] == 'MBTA' 

1011 ): 

1012 # FIXME: Remove this block once multiple channels can be handled on 

1013 # one gracedb instance 

1014 gracedb.upload.delay( 

1015 filecontents=None, filename=None, 

1016 graceid=superevent_id, 

1017 message='Parameter estimation is disabled for MBTA triggers ' 

1018 'on playground as MBTA analyses live data + online ' 

1019 'injections not O3 replay data + MDC injections', 

1020 tags='pe' 

1021 ) 

1022 elif ( 

1023 pe_pipeline == 'rapidpe' and 

1024 event['search'].lower() == 'earlywarning' 

1025 ): 

1026 # Remove this if rapidpe can ingest early warning events 

1027 gracedb.upload.delay( 

1028 filecontents=None, filename=None, 

1029 graceid=superevent_id, 

1030 message='Parameter estimation by RapidPE-RIFT is disabled for ' 

1031 'earlywarning triggers.', 

1032 tags='pe' 

1033 ) 

1034 else: 

1035 inference.start_pe.delay(event, superevent_id, pe_pipeline) 

1036 

1037 

1038@gracedb.task(shared=False) 

1039def earlywarning_preliminary_initial_update_alert( 

1040 filenames, 

1041 superevent, 

1042 alert_type, 

1043 filecontents=None 

1044): 

1045 """ 

1046 Create a canvas that sends an earlywarning, preliminary, initial, or update 

1047 notice. 

1048 

1049 Parameters 

1050 ---------- 

1051 filenames : tuple 

1052 A list of the sky map, em_bright, and p_astro filenames. 

1053 superevent : dict 

1054 The superevent dictionary, typically obtained from an IGWN Alert or 

1055 from querying GraceDB. 

1056 alert_type : {'less-significant', 'earlywarning', 'preliminary', 'initial', 'update'} # noqa: E501 

1057 The alert type. 

1058 

1059 Notes 

1060 ----- 

1061 Tasks that call this function should be decorated with 

1062 :obj:`gwcelery.tasks.gracedb.task` rather than :obj:`gwcelery.app.task` so 

1063 that a synchronous call to :func:`gwcelery.tasks.gracedb.get_log` is 

1064 retried in the event of GraceDB API failures. If `EM_COINC` is in labels 

1065 will create a RAVEN circular. 

1066 

1067 """ 

1068 labels = superevent['labels'] 

1069 superevent_id = superevent['superevent_id'] 

1070 

1071 if 'INJ' in labels: 

1072 return 

1073 

1074 if filecontents: 

1075 assert alert_type in { 

1076 'less-significant', 'earlywarning', 'preliminary'} 

1077 

1078 skymap_filename, em_bright_filename, p_astro_filename = filenames 

1079 rapidpe_pastro_filename = None 

1080 rapidpe_pastro_needed = True 

1081 combined_skymap_filename = None 

1082 combined_skymap_needed = False 

1083 skymap_needed = (skymap_filename is None) 

1084 em_bright_needed = (em_bright_filename is None) 

1085 p_astro_needed = (p_astro_filename is None) 

1086 raven_coinc = ('RAVEN_ALERT' in labels and bool(superevent['em_type'])) 

1087 if raven_coinc: 

1088 ext_labels = gracedb.get_labels(superevent['em_type']) 

1089 combined_skymap_needed = \ 

1090 {"RAVEN_ALERT", "COMBINEDSKYMAP_READY"}.issubset(set(ext_labels)) 

1091 

1092 # FIXME: This if statement is always True, we should get rid of it. 

1093 if skymap_needed or em_bright_needed or p_astro_needed or \ 

1094 combined_skymap_needed or rapidpe_pastro_needed: 

1095 for message in gracedb.get_log(superevent_id): 

1096 t = message['tag_names'] 

1097 f = message['filename'] 

1098 v = message['file_version'] 

1099 fv = '{},{}'.format(f, v) 

1100 if not f: 

1101 continue 

1102 if skymap_needed \ 

1103 and {'sky_loc', 'public'}.issubset(t) \ 

1104 and f.endswith('.multiorder.fits') \ 

1105 and 'combined' not in f: 

1106 skymap_filename = fv 

1107 if em_bright_needed \ 

1108 and 'em_bright' in t \ 

1109 and f.endswith('.json'): 

1110 em_bright_filename = fv 

1111 if p_astro_needed \ 

1112 and {'public'}.issubset(t) \ 

1113 and 'p_astro' in t \ 

1114 and f.endswith('.json'): 

1115 p_astro_filename = fv 

1116 if combined_skymap_needed \ 

1117 and {'sky_loc', 'ext_coinc'}.issubset(t) \ 

1118 and f.startswith('combined-ext.') \ 

1119 and 'fit' in f: 

1120 combined_skymap_filename = fv 

1121 if rapidpe_pastro_needed \ 

1122 and 'p_astro' in t \ 

1123 and f == 'RapidPE_RIFT.p_astro.json': 

1124 rapidpe_pastro_filename = fv 

1125 

1126 if combined_skymap_needed: 

1127 # for every alert, copy combined sky map over if applicable 

1128 # FIXME: use file inheritance once available 

1129 ext_id = superevent['em_type'] 

1130 if combined_skymap_filename: 

1131 # If previous sky map, increase version by 1 

1132 combined_skymap_filename_base, v = \ 

1133 combined_skymap_filename.split(',') 

1134 v = str(int(v) + 1) 

1135 combined_skymap_filename = \ 

1136 combined_skymap_filename_base + ',' + v 

1137 else: 

1138 combined_skymap_filename_base = \ 

1139 (external_skymaps.COMBINED_SKYMAP_FILENAME_MULTIORDER 

1140 if '.multiorder.fits' in skymap_filename else 

1141 external_skymaps.COMBINED_SKYMAP_FILENAME_FLAT) 

1142 combined_skymap_filename = combined_skymap_filename_base + ',0' 

1143 message = 'Combined LVK-external sky map copied from {0}'.format( 

1144 ext_id) 

1145 message_png = ( 

1146 'Mollweide projection of <a href="/api/events/{se_id}/files/' 

1147 '{filename}">{filename}</a>, copied from {ext_id}').format( 

1148 se_id=superevent_id, 

1149 ext_id=ext_id, 

1150 filename=combined_skymap_filename) 

1151 

1152 combined_skymap_canvas = group( 

1153 gracedb.download.si(combined_skymap_filename_base, ext_id) 

1154 | 

1155 gracedb.upload.s( 

1156 combined_skymap_filename_base, superevent_id, 

1157 message, ['sky_loc', 'ext_coinc', 'public']) 

1158 | 

1159 gracedb.create_label.si('COMBINEDSKYMAP_READY', superevent_id), 

1160 

1161 gracedb.download.si(external_skymaps.COMBINED_SKYMAP_FILENAME_PNG, 

1162 ext_id) 

1163 | 

1164 gracedb.upload.s( 

1165 external_skymaps.COMBINED_SKYMAP_FILENAME_PNG, superevent_id, 

1166 message_png, ['sky_loc', 'ext_coinc', 'public'] 

1167 ) 

1168 | 

1169 # Pass None to download_anor_expose group 

1170 identity.si() 

1171 ) 

1172 

1173 # circular template not needed for less-significant alerts 

1174 if alert_type in {'earlywarning', 'preliminary', 'initial'}: 

1175 if raven_coinc: 

1176 circular_task = circulars.create_emcoinc_circular.si(superevent_id) 

1177 circular_filename = '{}-emcoinc-circular.txt'.format(alert_type) 

1178 tags = ['em_follow', 'ext_coinc'] 

1179 

1180 else: 

1181 circular_task = circulars.create_initial_circular.si(superevent_id) 

1182 circular_filename = '{}-circular.txt'.format(alert_type) 

1183 tags = ['em_follow'] 

1184 

1185 circular_canvas = ( 

1186 circular_task 

1187 | 

1188 gracedb.upload.s( 

1189 circular_filename, 

1190 superevent_id, 

1191 'Template for {} GCN Circular'.format(alert_type), 

1192 tags=tags) 

1193 ) 

1194 

1195 else: 

1196 circular_canvas = identity.si() 

1197 

1198 # less-significant alerts have "preliminary" voevent notice type 

1199 alert_type_voevent = 'preliminary' if alert_type == 'less-significant' \ 

1200 else alert_type 

1201 # set the significant field in the VOEvent based on 

1202 # less-significant/significant alert. 

1203 # For kafka alerts the analogous field is set in alerts.py. 

1204 # (see comment before defining kafka_alert_canvas) 

1205 voevent_significance = 0 if alert_type == 'less-significant' else 1 

1206 

1207 if filecontents and not combined_skymap_filename: 

1208 skymap, em_bright, p_astro_dict = filecontents 

1209 

1210 # check high profile and apply label if true 

1211 if alert_type == 'preliminary': 

1212 high_profile_canvas = rrt_utils.check_high_profile.si( 

1213 skymap, em_bright, p_astro_dict, superevent 

1214 ) 

1215 else: 

1216 high_profile_canvas = identity.si() 

1217 

1218 download_andor_expose_group = [] 

1219 if rapidpe_pastro_filename is None: 

1220 voevent_canvas = _create_voevent.si( 

1221 (em_bright, p_astro_dict), 

1222 superevent_id, 

1223 alert_type_voevent, 

1224 Significant=voevent_significance, 

1225 skymap_filename=skymap_filename, 

1226 internal=False, 

1227 open_alert=True, 

1228 raven_coinc=raven_coinc, 

1229 combined_skymap_filename=combined_skymap_filename 

1230 ) 

1231 rapidpe_canvas = _update_rapidpe_pastro_shouldnt_run.s() 

1232 

1233 # kafka alerts have a field called "significant" based on 

1234 # https://dcc.ligo.org/LIGO-G2300151/public 

1235 # The alert_type value passed to alerts.send is used to 

1236 # set this field in the alert dictionary 

1237 kafka_alert_canvas = alerts.send.si( 

1238 (skymap, em_bright, p_astro_dict), 

1239 superevent, 

1240 alert_type, 

1241 raven_coinc=raven_coinc 

1242 ) 

1243 else: 

1244 voevent_canvas = _create_voevent.s( 

1245 superevent_id, 

1246 alert_type_voevent, 

1247 Significant=voevent_significance, 

1248 skymap_filename=skymap_filename, 

1249 internal=False, 

1250 open_alert=True, 

1251 raven_coinc=raven_coinc, 

1252 combined_skymap_filename=combined_skymap_filename 

1253 ) 

1254 download_andor_expose_group += [ 

1255 gracedb.download.si(rapidpe_pastro_filename, superevent_id) 

1256 ] 

1257 

1258 kafka_alert_canvas = _check_pastro_and_send_alert.s( 

1259 skymap, 

1260 em_bright, 

1261 superevent, 

1262 alert_type, 

1263 raven_coinc=raven_coinc 

1264 ) 

1265 

1266 rapidpe_canvas = ( 

1267 _update_rapidpe_pastro.s( 

1268 em_bright=em_bright, 

1269 pipeline_pastro=p_astro_dict) 

1270 | 

1271 _upload_rapidpe_pastro_json.s( 

1272 superevent_id, 

1273 rapidpe_pastro_filename 

1274 ) 

1275 ) 

1276 else: 

1277 # Download em_bright and p_astro files here for voevent 

1278 download_andor_expose_group = [ 

1279 gracedb.download.si(em_bright_filename, superevent_id) if 

1280 em_bright_filename is not None else identity.s(None), 

1281 gracedb.download.si(p_astro_filename, superevent_id) if 

1282 p_astro_filename is not None else identity.s(None), 

1283 ] 

1284 high_profile_canvas = identity.si() 

1285 

1286 voevent_canvas = _create_voevent.s( 

1287 superevent_id, 

1288 alert_type_voevent, 

1289 Significant=voevent_significance, 

1290 skymap_filename=skymap_filename, 

1291 internal=False, 

1292 open_alert=True, 

1293 raven_coinc=raven_coinc, 

1294 combined_skymap_filename=combined_skymap_filename 

1295 ) 

1296 

1297 if rapidpe_pastro_filename: 

1298 download_andor_expose_group += [ 

1299 gracedb.download.si(rapidpe_pastro_filename, superevent_id) 

1300 ] 

1301 

1302 rapidpe_canvas = ( 

1303 _update_rapidpe_pastro.s() 

1304 | 

1305 _upload_rapidpe_pastro_json.s( 

1306 superevent_id, 

1307 rapidpe_pastro_filename 

1308 ) 

1309 ) 

1310 

1311 # The skymap has not been downloaded at this point, so we need to 

1312 # download it before we can assemble the kafka alerts and send them 

1313 kafka_alert_canvas = alerts.download_skymap_and_send_alert.s( 

1314 superevent, 

1315 alert_type, 

1316 skymap_filename=skymap_filename, 

1317 raven_coinc=raven_coinc, 

1318 combined_skymap_filename=combined_skymap_filename 

1319 ) 

1320 

1321 to_expose = [skymap_filename, em_bright_filename, p_astro_filename] 

1322 # Since PE skymap images, HTML, and gzip FITS are not made public when they 

1323 # are uploaded, we need to expose them here. 

1324 if ( 

1325 skymap_filename is not None and 'bilby' in skymap_filename.lower() 

1326 ): 

1327 prefix, _, _ = skymap_filename.partition('.multiorder.fits') 

1328 to_expose += [f'{prefix}.html', f'{prefix}.png', 

1329 f'{prefix}.volume.png', f'{prefix}.fits.gz'] 

1330 download_andor_expose_group += [ 

1331 gracedb.expose.si(superevent_id), 

1332 *( 

1333 gracedb.create_tag.si(filename, 'public', superevent_id) 

1334 for filename in to_expose if filename is not None 

1335 ) 

1336 ] 

1337 

1338 voevent_canvas |= group( 

1339 gracedb.download.s(superevent_id) 

1340 | 

1341 gcn.send.s(), 

1342 

1343 gracedb.create_tag.s('public', superevent_id) 

1344 ) 

1345 

1346 if combined_skymap_needed: 

1347 download_andor_expose_group += [combined_skymap_canvas] 

1348 

1349 sent_label_canvas = identity.si() 

1350 if alert_type == 'less-significant': 

1351 sent_label_canvas = gracedb.create_label.si( 

1352 'LOW_SIGNIF_PRELIM_SENT', 

1353 superevent_id 

1354 ) 

1355 elif alert_type == 'preliminary': 

1356 sent_label_canvas = gracedb.create_label.si( 

1357 'GCN_PRELIM_SENT', 

1358 superevent_id 

1359 ) 

1360 

1361 # NOTE: The following canvas structure was used to fix #480 

1362 canvas = ( 

1363 group(download_andor_expose_group) 

1364 ) 

1365 if rapidpe_pastro_filename: 

1366 canvas |= rapidpe_canvas 

1367 

1368 canvas |= ( 

1369 group( 

1370 voevent_canvas 

1371 | 

1372 group( 

1373 circular_canvas, 

1374 

1375 sent_label_canvas 

1376 ), 

1377 

1378 kafka_alert_canvas, 

1379 

1380 high_profile_canvas 

1381 ) 

1382 ) 

1383 

1384 canvas.apply_async() 

1385 

1386 

1387@app.task(shared=False) 

1388def _update_rapidpe_pastro(input_list, em_bright=None, pipeline_pastro=None): 

1389 """ 

1390 If p_terr from rapidpe is different from the p_terr from the most 

1391 recent preferred event, replaces rapidpe's p_terr with pipeline p_terr. 

1392 Returns a tuple of em_bright, rapidpe pastro and a 

1393 boolean(rapidpe_pastro_updated) indicating if rapidpe pastro has been 

1394 updated. If p_terr in rapidpe has been updated, the return list contains 

1395 the updated pastro and the rapidpe_pastro_updated is True. Else, the 

1396 return list contains the rapidpe pastro from the input_list and 

1397 rapidpe_pastro_updated is False. 

1398 """ 

1399 # input_list is download_andor_expose_group in 

1400 # function earlywarning_preliminary_initial_update_alert 

1401 if pipeline_pastro is None: 

1402 em_bright, pipeline_pastro, rapidpe_pastro, *_ = input_list 

1403 else: 

1404 rapidpe_pastro, *_ = input_list 

1405 pipeline_pastro_contents = json.loads(pipeline_pastro) 

1406 rapidpe_pastro_contents = json.loads(rapidpe_pastro) 

1407 

1408 if (rapidpe_pastro_contents["Terrestrial"] 

1409 == pipeline_pastro_contents["Terrestrial"]): 

1410 rapidpe_pastro_updated = False 

1411 else: 

1412 rapidpe_pastro = json.dumps( 

1413 rpe_pastro.renormalize_pastro_with_pipeline_pterr( 

1414 rapidpe_pastro_contents, pipeline_pastro_contents 

1415 ) 

1416 ) 

1417 rapidpe_pastro_updated = True 

1418 

1419 return em_bright, rapidpe_pastro, rapidpe_pastro_updated 

1420 

1421 

1422@app.task(shared=False) 

1423def _update_rapidpe_pastro_shouldnt_run(): 

1424 raise RuntimeError( 

1425 "The `rapidpe_canvas' was executed where it should" 

1426 "not have been. A bug must have been introduced." 

1427 ) 

1428 

1429 

1430@gracedb.task(shared=False) 

1431def _upload_rapidpe_pastro_json( 

1432 input_list, 

1433 superevent_id, 

1434 rapidpe_pastro_filename 

1435): 

1436 """ 

1437 Add public tag to RapidPE_RIFT.p_astro.json if p_terr from the 

1438 preferred event is same as the p_terr in RapidPE_RIFT.p_astro.json. 

1439 Else, uploads an updated version of RapidPE_RIFT.p_astro.json 

1440 with file content from the task update_rapidpe_pastro. 

1441 """ 

1442 # input_list is output from update_rapidpe_pastro 

1443 *return_list, rapidpe_pastro_updated = input_list 

1444 if rapidpe_pastro_updated is True: 

1445 tags = ("pe", "p_astro", "public") 

1446 

1447 upload_filename = "RapidPE_RIFT.p_astro.json" 

1448 description = "RapidPE-RIFT Pastro results" 

1449 content = input_list[1] 

1450 gracedb.upload( 

1451 content, 

1452 upload_filename, 

1453 superevent_id, 

1454 description, 

1455 tags 

1456 ) 

1457 return return_list 

1458 

1459 

1460@app.task(shared=False) 

1461def _check_pastro_and_send_alert(input_classification, skymap, em_bright, 

1462 superevent, alert_type, raven_coinc=False): 

1463 """Wrapper for :meth:`~gwcelery.tasks.alerts.send` meant to take a 

1464 potentially new p-astro as input from the preceding task. 

1465 """ 

1466 _, p_astro = input_classification 

1467 alerts.send.delay( 

1468 (skymap, em_bright, p_astro), 

1469 superevent, 

1470 alert_type, 

1471 raven_coinc=raven_coinc 

1472 ) 

1473 

1474 

1475@gracedb.task(ignore_result=True, shared=False) 

1476def initial_alert(filenames, alert): 

1477 """Produce an initial alert. 

1478 

1479 This does nothing more than call 

1480 :meth:`~gwcelery.tasks.orchestrator.earlywarning_preliminary_initial_update_alert` 

1481 with ``alert_type='initial'``. 

1482 

1483 Parameters 

1484 ---------- 

1485 filenames : tuple 

1486 A list of the sky map, em_bright, and p_astro filenames. 

1487 alert : dict 

1488 IGWN-Alert dictionary 

1489 

1490 Notes 

1491 ----- 

1492 This function is decorated with :obj:`gwcelery.tasks.gracedb.task` rather 

1493 than :obj:`gwcelery.app.task` so that a synchronous call to 

1494 :func:`gwcelery.tasks.gracedb.get_log` is retried in the event of GraceDB 

1495 API failures. 

1496 

1497 """ 

1498 earlywarning_preliminary_initial_update_alert( 

1499 filenames, 

1500 alert['object'], 

1501 'initial' 

1502 ) 

1503 

1504 

1505@gracedb.task(ignore_result=True, shared=False) 

1506def update_alert(filenames, superevent_id): 

1507 """Produce an update alert. 

1508 

1509 This does nothing more than call 

1510 :meth:`~gwcelery.tasks.orchestrator.earlywarning_preliminary_initial_update_alert` 

1511 with ``alert_type='update'``. 

1512 

1513 Parameters 

1514 ---------- 

1515 filenames : tuple 

1516 A list of the sky map, em_bright, and p_astro filenames. 

1517 superevent_id : str 

1518 The superevent ID. 

1519 

1520 Notes 

1521 ----- 

1522 This function is decorated with :obj:`gwcelery.tasks.gracedb.task` rather 

1523 than :obj:`gwcelery.app.task` so that a synchronous call to 

1524 :func:`gwcelery.tasks.gracedb.get_log` is retried in the event of GraceDB 

1525 API failures. 

1526 

1527 """ 

1528 superevent = gracedb.get_superevent._orig_run(superevent_id) 

1529 earlywarning_preliminary_initial_update_alert( 

1530 filenames, 

1531 superevent, 

1532 'update' 

1533 ) 

1534 

1535 

1536@app.task(ignore_result=True, shared=False) 

1537def retraction_alert(alert): 

1538 """Produce a retraction alert.""" 

1539 superevent_id = alert['uid'] 

1540 group( 

1541 gracedb.expose.si(superevent_id) 

1542 | 

1543 group( 

1544 _create_voevent.si( 

1545 None, superevent_id, 'retraction', 

1546 internal=False, 

1547 open_alert=True 

1548 ) 

1549 | 

1550 group( 

1551 gracedb.download.s(superevent_id) 

1552 | 

1553 gcn.send.s(), 

1554 

1555 gracedb.create_tag.s('public', superevent_id) 

1556 ), 

1557 

1558 alerts.send.si( 

1559 None, 

1560 alert['object'], 

1561 'retraction' 

1562 ) 

1563 ), 

1564 

1565 circulars.create_retraction_circular.si(superevent_id) 

1566 | 

1567 gracedb.upload.s( 

1568 'retraction-circular.txt', 

1569 superevent_id, 

1570 'Template for retraction GCN Circular', 

1571 tags=['em_follow'] 

1572 ) 

1573 ).apply_async()