Coverage for gwcelery/tasks/external_triggers.py: 100%

247 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-25 18:01 +0000

1from pathlib import Path 

2from urllib.parse import urlparse 

3 

4from astropy.time import Time 

5from celery import group 

6from celery.utils.log import get_logger 

7from lxml import etree 

8 

9from .. import app 

10from . import (alerts, detchar, external_skymaps, gcn, gracedb, igwn_alert, 

11 raven) 

12 

13log = get_logger(__name__) 

14 

15 

16REQUIRED_LABELS_BY_TASK = { 

17 # EM_READY implies preferred event sky map is available 

18 'compare': {'EM_READY', 'EXT_SKYMAP_READY', 'EM_COINC'}, 

19 'SoG': {'SKYMAP_READY', 'RAVEN_ALERT', 'ADVOK'} 

20} 

21"""These labels should be present on an external event to consider it to 

22be ready for sky map comparison or for post-alert analysis, such as a 

23measurment of the speed of gravity (SoG). 

24""" 

25 

26FERMI_GRB_CLASS_VALUE = 4 

27"""This is the index that denote GRBs within Fermi's Flight Position 

28classification.""" 

29 

30FERMI_GRB_CLASS_THRESH = 50 

31"""This values denotes the threshold of the most likely Fermi source 

32classification, above which we will consider a Fermi Flight Position 

33notice.""" 

34 

35 

36@gcn.handler(gcn.NoticeType.SNEWS, 

37 queue='exttrig', 

38 shared=False) 

39def handle_snews_gcn(payload): 

40 """Handles the GCN notice payload from SNEWS alerts. 

41 

42 Prepares the alert to be sent to graceDB as external events, updating the 

43 info if it already exists. 

44 

45 Parameters 

46 ---------- 

47 payload : str 

48 XML GCN notice alert packet in string format 

49 

50 """ 

51 root = etree.fromstring(payload) 

52 

53 # Get TrigID and Test Event Boolean 

54 trig_id = root.find("./What/Param[@name='TrigID']").attrib['value'] 

55 ext_group = 'Test' if root.attrib['role'] == 'test' else 'External' 

56 

57 event_observatory = 'SNEWS' 

58 if 'mdc-test_event' in root.attrib['ivorn'].lower(): 

59 search = 'MDC' 

60 else: 

61 search = 'Supernova' 

62 query = 'group: External pipeline: {} grbevent.trigger_id = "{}"'.format( 

63 event_observatory, trig_id) 

64 

65 ( 

66 gracedb.get_events.si(query=query) 

67 | 

68 _create_replace_external_event_and_skymap.s( 

69 payload, search, event_observatory, ext_group=ext_group 

70 ) 

71 ).delay() 

72 

73 

74@gcn.handler(gcn.NoticeType.FERMI_GBM_ALERT, 

75 gcn.NoticeType.FERMI_GBM_FLT_POS, 

76 gcn.NoticeType.FERMI_GBM_GND_POS, 

77 gcn.NoticeType.FERMI_GBM_FIN_POS, 

78 gcn.NoticeType.SWIFT_BAT_GRB_POS_ACK, 

79 gcn.NoticeType.FERMI_GBM_SUBTHRESH, 

80 gcn.NoticeType.INTEGRAL_WAKEUP, 

81 gcn.NoticeType.INTEGRAL_REFINED, 

82 gcn.NoticeType.INTEGRAL_OFFLINE, 

83 queue='exttrig', 

84 shared=False) 

85def handle_grb_gcn(payload): 

86 """Handles the payload from Fermi, Swift, and INTEGRAL GCN notices. 

87 

88 Filters out candidates likely to be noise. Creates external events 

89 from the notice if new notice, otherwise updates existing event. Then 

90 creates and/or grabs external sky map to be uploaded to the external event. 

91 

92 More info for these notices can be found at: 

93 Fermi-GBM: https://gcn.gsfc.nasa.gov/fermi_grbs.html 

94 Fermi-GBM sub: https://gcn.gsfc.nasa.gov/fermi_gbm_subthresh_archive.html 

95 Swift: https://gcn.gsfc.nasa.gov/swift.html 

96 INTEGRAL: https://gcn.gsfc.nasa.gov/integral.html 

97 

98 Parameters 

99 ---------- 

100 payload : str 

101 XML GCN notice alert packet in string format 

102 

103 """ 

104 root = etree.fromstring(payload) 

105 u = urlparse(root.attrib['ivorn']) 

106 stream_path = u.path 

107 

108 stream_obsv_dict = {'/SWIFT': 'Swift', 

109 '/Fermi': 'Fermi', 

110 '/INTEGRAL': 'INTEGRAL'} 

111 event_observatory = stream_obsv_dict[stream_path] 

112 

113 ext_group = 'Test' if root.attrib['role'] == 'test' else 'External' 

114 

115 # Block Test INTEGRAL events on the production server to prevent 

116 # unneeded queries of old GW data during detchar check 

117 if event_observatory == 'INTEGRAL' and ext_group == 'Test' and \ 

118 app.conf['gracedb_host'] == 'gracedb.ligo.org': 

119 return 

120 # Get TrigID 

121 elif event_observatory == 'INTEGRAL' and \ 

122 not any([x in u.fragment for x in ['O3-replay', 'MDC-test']]): 

123 # FIXME: revert all this if INTEGRAL fixes their GCN notices 

124 # If INTEGRAL, get trigger ID from ivorn rather than the TrigID field 

125 # unless O3 replay or MDC event 

126 trig_id = u.fragment.split('_')[-1].split('-')[0] 

127 # Modify the TrigID field so GraceDB has the correct value 

128 root.find("./What/Param[@name='TrigID']").attrib['value'] = \ 

129 str(trig_id).encode() 

130 # Apply changes to payload delivered to GraceDB 

131 payload = etree.tostring(root, xml_declaration=True, encoding="UTF-8") 

132 else: 

133 try: 

134 trig_id = \ 

135 root.find("./What/Param[@name='TrigID']").attrib['value'] 

136 except AttributeError: 

137 trig_id = \ 

138 root.find("./What/Param[@name='Trans_Num']").attrib['value'] 

139 

140 notice_type = \ 

141 int(root.find("./What/Param[@name='Packet_Type']").attrib['value']) 

142 

143 reliability = root.find("./What/Param[@name='Reliability']") 

144 if reliability is not None and int(reliability.attrib['value']) <= 4: 

145 return 

146 

147 # Check if Fermi trigger is likely noise by checking classification 

148 # Most_Likely_Index of 4 is an astrophysical GRB 

149 # If not at least 50% chance of GRB we will not consider it for RAVEN 

150 likely_source = root.find("./What/Param[@name='Most_Likely_Index']") 

151 likely_prob = root.find("./What/Param[@name='Most_Likely_Prob']") 

152 not_likely_grb = likely_source is not None and \ 

153 (likely_source.attrib['value'] != FERMI_GRB_CLASS_VALUE 

154 or likely_prob.attrib['value'] < FERMI_GRB_CLASS_THRESH) 

155 

156 # Check if initial Fermi alert. These are generally unreliable and should 

157 # never trigger a RAVEN alert, but will give us earlier warning of a 

158 # possible coincidence. Later notices could change this. 

159 initial_gbm_alert = notice_type == gcn.NoticeType.FERMI_GBM_ALERT 

160 

161 # Check if Swift has lost lock. If so then veto 

162 lost_lock = \ 

163 root.find("./What/Group[@name='Solution_Status']" + 

164 "/Param[@name='StarTrack_Lost_Lock']") 

165 swift_veto = lost_lock is not None and lost_lock.attrib['value'] == 'true' 

166 

167 # Only send alerts if likely a GRB, is not a low-confidence early Fermi 

168 # alert, and if not a Swift veto 

169 if not_likely_grb or initial_gbm_alert or swift_veto: 

170 label = 'NOT_GRB' 

171 else: 

172 label = None 

173 

174 ivorn = root.attrib['ivorn'] 

175 if 'subthresh' in ivorn.lower(): 

176 search = 'SubGRB' 

177 elif 'mdc-test_event' in ivorn.lower(): 

178 search = 'MDC' 

179 else: 

180 search = 'GRB' 

181 

182 if search == 'SubGRB' and event_observatory == 'Fermi': 

183 skymap_link = \ 

184 root.find("./What/Param[@name='HealPix_URL']").attrib['value'] 

185 else: 

186 skymap_link = None 

187 

188 query = 'group: External pipeline: {} grbevent.trigger_id = "{}"'.format( 

189 event_observatory, trig_id) 

190 

191 ( 

192 gracedb.get_events.si(query=query) 

193 | 

194 _create_replace_external_event_and_skymap.s( 

195 payload, search, event_observatory, 

196 ext_group=ext_group, label=label, 

197 notice_date=root.find("./Who/Date").text, 

198 notice_type=notice_type, 

199 skymap_link=skymap_link, 

200 use_radec=search in {'GRB', 'MDC'} 

201 ) 

202 ).delay() 

203 

204 

205@igwn_alert.handler('superevent', 

206 'mdc_superevent', 

207 'external_fermi', 

208 'external_swift', 

209 'external_integral', 

210 shared=False) 

211def handle_grb_igwn_alert(alert): 

212 """Parse an IGWN alert message related to superevents/GRB external triggers 

213 and dispatch it to other tasks. 

214 

215 Notes 

216 ----- 

217 This IGWN alert message handler is triggered by creating a new superevent 

218 or GRB external trigger event, a label associated with completeness of 

219 skymaps or change in state, or if a sky map file is uploaded: 

220 

221 * New event/superevent triggers a coincidence search with 

222 :meth:`gwcelery.tasks.raven.coincidence_search`. 

223 * When both a GW and GRB sky map are available during a coincidence, 

224 indicated by the labels ``EM_READY`` and ``EXT_SKYMAP_READY`` 

225 respectively on the external event, this triggers the spacetime coinc 

226 FAR to be calculated and a combined GW-GRB sky map is created using 

227 :meth:`gwcelery.tasks.external_skymaps.create_combined_skymap`. 

228 * Re-run sky map comparison if complete, and either the GW or GRB sky 

229 map has been updated or if the preferred event changed. 

230 * Re-check RAVEN publishing conditions if the GRB was previously 

231 considered non-astrophycial but now should be considered. 

232 

233 Parameters 

234 ---------- 

235 alert : dict 

236 IGWN alert packet 

237 

238 """ 

239 # Determine GraceDB ID 

240 graceid = alert['uid'] 

241 

242 # launch searches 

243 if alert['alert_type'] == 'new': 

244 if alert['object'].get('group') == 'External': 

245 # launch search with MDC events and exit 

246 if alert['object']['search'] == 'MDC': 

247 raven.coincidence_search(graceid, alert['object'], 

248 group='CBC', se_searches=['MDC']) 

249 raven.coincidence_search(graceid, alert['object'], 

250 group='Burst', se_searches=['MDC']) 

251 return 

252 

253 if alert['object']['search'] in ['SubGRB', 'SubGRBTargeted']: 

254 # if sub-threshold GRB, launch search with that pipeline 

255 raven.coincidence_search( 

256 graceid, alert['object'], 

257 searches=['SubGRB', 'SubGRBTargeted'], 

258 se_searches=['AllSky'], 

259 pipelines=[alert['object']['pipeline']]) 

260 else: 

261 # launch standard Burst-GRB search 

262 raven.coincidence_search(graceid, alert['object'], 

263 group='Burst', se_searches=['AllSky']) 

264 

265 # launch standard CBC-GRB search 

266 raven.coincidence_search(graceid, alert['object'], 

267 group='CBC', searches=['GRB']) 

268 elif 'S' in graceid: 

269 # launch standard GRB search based on group 

270 gw_group = alert['object']['preferred_event_data']['group'] 

271 search = alert['object']['preferred_event_data']['search'] 

272 

273 # launch search with MDC events and exit 

274 if alert['object']['preferred_event_data']['search'] == 'MDC': 

275 raven.coincidence_search(graceid, alert['object'], 

276 group=gw_group, searches=['MDC']) 

277 return 

278 # Don't run search for BBH or IMBH Burst search 

279 elif gw_group == 'Burst' and search.lower() != 'allsky': 

280 return 

281 

282 se_searches = [] if gw_group == 'CBC' else ['AllSky'] 

283 searches = (['SubGRB', 'SubGRBTargeted'] if gw_group == 'CBC' else 

284 ['SubGRBTargeted']) 

285 # launch standard GRB search 

286 raven.coincidence_search(graceid, alert['object'], 

287 group=gw_group, searches=['GRB'], 

288 se_searches=se_searches) 

289 # launch subthreshold search for Fermi and Swift separately to use 

290 # different time windows, for both CBC and Burst 

291 for pipeline in ['Fermi', 'Swift']: 

292 raven.coincidence_search( 

293 graceid, alert['object'], group=gw_group, 

294 searches=searches, pipelines=[pipeline]) 

295 # re-run raven pipeline and create combined sky map (if not a Swift event) 

296 # when sky maps are available 

297 elif alert['alert_type'] == 'label_added' and \ 

298 alert['object'].get('group') == 'External': 

299 if _skymaps_are_ready(alert['object'], alert['data']['name'], 

300 'compare'): 

301 # if both sky maps present and a coincidence, re-run RAVEN 

302 # pipeline and create combined sky maps 

303 ext_event = alert['object'] 

304 superevent_id, ext_id = _get_superevent_ext_ids( 

305 graceid, ext_event) 

306 superevent = gracedb.get_superevent(superevent_id) 

307 _relaunch_raven_pipeline_with_skymaps( 

308 superevent, ext_event, graceid) 

309 elif 'EM_COINC' in alert['object']['labels']: 

310 # if not complete, check if GW sky map; apply label to external 

311 # event if GW sky map 

312 se_labels = gracedb.get_labels(alert['object']['superevent']) 

313 if 'SKYMAP_READY' in se_labels: 

314 gracedb.create_label.si('SKYMAP_READY', graceid).delay() 

315 if 'EM_READY' in se_labels: 

316 gracedb.create_label.si('EM_READY', graceid).delay() 

317 # apply labels from superevent to external event to update state 

318 # and trigger functionality requiring sky maps, etc. 

319 elif alert['alert_type'] == 'label_added' and 'S' in graceid: 

320 if 'SKYMAP_READY' in alert['object']['labels']: 

321 # if sky map in superevent, apply label to all external events 

322 # at the time 

323 group( 

324 gracedb.create_label.si('SKYMAP_READY', ext_id) 

325 for ext_id in alert['object']['em_events'] 

326 ).delay() 

327 if 'EM_READY' in alert['object']['labels']: 

328 # if sky map not in superevent but in preferred event, apply label 

329 # to all external events at the time 

330 group( 

331 gracedb.create_label.si('EM_READY', ext_id) 

332 for ext_id in alert['object']['em_events'] 

333 ).delay() 

334 if _skymaps_are_ready(alert['object'], alert['data']['name'], 'SoG') \ 

335 and alert['object']['space_coinc_far'] is not None: 

336 # if a superevent is vetted by ADVOK and a spatial joint FAR is 

337 # available, check if SoG publishing conditions are met 

338 ( 

339 gracedb.get_event.si(alert['object']['em_type']) 

340 | 

341 raven.sog_paper_pipeline.s(alert['object']) 

342 ).delay() 

343 # if new GW or external sky map after first being available, try to remake 

344 # combine sky map and rerun raven pipeline 

345 elif alert['alert_type'] == 'log' and \ 

346 'EM_COINC' in alert['object']['labels'] and \ 

347 'fit' in alert['data']['filename'] and \ 

348 'flat' not in alert['data']['comment'].lower() and \ 

349 (alert['data']['filename'] != 

350 external_skymaps.COMBINED_SKYMAP_FILENAME_MULTIORDER): 

351 superevent_id, external_id = _get_superevent_ext_ids( 

352 graceid, alert['object']) 

353 if 'S' in graceid: 

354 superevent = alert['object'] 

355 else: 

356 superevent = gracedb.get_superevent(alert['object']['superevent']) 

357 external_event = alert['object'] 

358 # check if combined sky map already made, with the exception of Swift 

359 # which will fail 

360 if 'S' in graceid: 

361 # Rerun for all eligible external events 

362 for ext_id in superevent['em_events']: 

363 external_event = gracedb.get_event(ext_id) 

364 if REQUIRED_LABELS_BY_TASK['compare'].issubset( 

365 set(external_event['labels'])): 

366 _relaunch_raven_pipeline_with_skymaps( 

367 superevent, external_event, graceid, 

368 use_superevent_skymap=True) 

369 else: 

370 if REQUIRED_LABELS_BY_TASK['compare'].issubset( 

371 set(external_event['labels'])): 

372 _relaunch_raven_pipeline_with_skymaps( 

373 superevent, external_event, graceid) 

374 # Rerun the coincidence FAR calculation if possible with combined sky map 

375 # if the preferred event changes 

376 # We don't want to run this logic if PE results are present 

377 elif alert['alert_type'] == 'log' and \ 

378 'PE_READY' not in alert['object']['labels'] and \ 

379 'EM_COINC' in alert['object']['labels']: 

380 new_log_comment = alert['data'].get('comment', '') 

381 if 'S' in graceid and \ 

382 new_log_comment.startswith('Updated superevent parameters: ' 

383 'preferred_event: '): 

384 superevent = alert['object'] 

385 # Rerun for all eligible external events 

386 for ext_id in superevent['em_events']: 

387 external_event = gracedb.get_event(ext_id) 

388 if REQUIRED_LABELS_BY_TASK['compare'].issubset( 

389 set(external_event['labels'])): 

390 _relaunch_raven_pipeline_with_skymaps( 

391 superevent, external_event, graceid, 

392 use_superevent_skymap=False) 

393 elif alert['alert_type'] == 'label_removed' and \ 

394 alert['object'].get('group') == 'External': 

395 if alert['data']['name'] == 'NOT_GRB' and \ 

396 'EM_COINC' in alert['object']['labels']: 

397 # if NOT_GRB is removed, re-check publishing conditions 

398 superevent_id = alert['object']['superevent'] 

399 superevent = gracedb.get_superevent(superevent_id) 

400 gw_group = superevent['preferred_event_data']['group'] 

401 coinc_far_dict = { 

402 'temporal_coinc_far': superevent['time_coinc_far'], 

403 'spatiotemporal_coinc_far': superevent['space_coinc_far'] 

404 } 

405 raven.trigger_raven_alert(coinc_far_dict, superevent, graceid, 

406 alert['object'], gw_group) 

407 

408 

409@igwn_alert.handler('superevent', 

410 'mdc_superevent', 

411 'external_snews', 

412 shared=False) 

413def handle_snews_igwn_alert(alert): 

414 """Parse an IGWN alert message related to superevents/Supernovae external 

415 triggers and dispatch it to other tasks. 

416 

417 Notes 

418 ----- 

419 This igwn_alert message handler is triggered whenever a new superevent 

420 or Supernovae external event is created: 

421 

422 * New event triggers a coincidence search with 

423 :meth:`gwcelery.tasks.raven.coincidence_search`. 

424 

425 Parameters 

426 ---------- 

427 alert : dict 

428 IGWN alert packet 

429 

430 """ 

431 # Determine GraceDB ID 

432 graceid = alert['uid'] 

433 

434 if alert['alert_type'] == 'new': 

435 if alert['object'].get('superevent_id'): 

436 group = alert['object']['preferred_event_data']['group'] 

437 search = alert['object']['preferred_event_data']['search'] 

438 searches = ['MDC'] if search == 'MDC' else ['Supernova'] 

439 # Run only on Test and Burst superevents 

440 if group in {'Burst', 'Test'}: 

441 raven.coincidence_search(graceid, alert['object'], 

442 group='Burst', searches=searches, 

443 pipelines=['SNEWS']) 

444 else: 

445 # Run on SNEWS event, either real or test 

446 search = alert['object']['search'] 

447 if search == 'MDC': 

448 raven.coincidence_search(graceid, alert['object'], 

449 group='Burst', 

450 se_searches=['MDC'], 

451 pipelines=['SNEWS']) 

452 elif search == 'Supernova': 

453 raven.coincidence_search(graceid, alert['object'], 

454 group='Burst', searches=['Supernova'], 

455 pipelines=['SNEWS']) 

456 

457 

458@alerts.handler('fermi', 

459 'swift') 

460def handle_targeted_kafka_alert(alert): 

461 """Parse an alert sent via Kafka from a MOU partner in our joint 

462 subthreshold targeted search. 

463 

464 Parameters 

465 ---------- 

466 alert : dict 

467 Kafka alert packet 

468 

469 """ 

470 # Convert alert to VOEvent format 

471 # FIXME: This is required until native ingesting of kafka events in GraceDB 

472 payload, pipeline, time, trig_id = _kafka_to_voevent(alert) 

473 

474 # Veto events that don't pass GRB FAR threshold 

475 far_grb = alert['far'] 

476 veto_event = \ 

477 app.conf['raven_targeted_far_thresholds']['GRB'][pipeline] < far_grb 

478 label = ('NOT_GRB' if alert['alert_type'] == "retraction" or veto_event 

479 else None) 

480 

481 # Look whether a previous event with the same ID exists 

482 query = 'group: External pipeline: {} grbevent.trigger_id = "{}"'.format( 

483 pipeline, trig_id) 

484 

485 ( 

486 gracedb.get_events.si(query=query) 

487 | 

488 _create_replace_external_event_and_skymap.s( 

489 payload, 'SubGRBTargeted', pipeline, 

490 label=label, notice_date=time, 

491 skymap=alert.get('healpix_file'), 

492 use_radec=('ra' in alert and 'dec' in alert) 

493 ) 

494 ).delay() 

495 

496 

497def _skymaps_are_ready(event, label, task): 

498 """Determine whether labels are complete to launch a certain task. 

499 

500 Parameters 

501 ---------- 

502 event : dict 

503 Either Superevent or external event dictionary 

504 graceid : str 

505 GraceDB ID 

506 task : str 

507 Determines which label schmema to check for completeness 

508 

509 Returns 

510 ------- 

511 labels_pass : bool 

512 True if all the require labels are present and the given label is part 

513 of that set 

514 """ 

515 label_set = set(event['labels']) 

516 required_labels = REQUIRED_LABELS_BY_TASK[task] 

517 return required_labels.issubset(label_set) and label in required_labels 

518 

519 

520def _get_superevent_ext_ids(graceid, event): 

521 """Grab superevent and external event IDs from a given event. 

522 

523 Parameters 

524 ---------- 

525 graceid : str 

526 GraceDB ID 

527 event : dict 

528 Either Superevent or external event dictionary 

529 

530 Returns 

531 ------- 

532 se_id, ext_id : tuple 

533 Tuple of superevent and external event GraceDB IDs 

534 

535 """ 

536 if 'S' in graceid: 

537 se_id = event['superevent_id'] 

538 ext_id = event['em_type'] 

539 else: 

540 se_id = event['superevent'] 

541 ext_id = event['graceid'] 

542 return se_id, ext_id 

543 

544 

545@app.task(shared=False) 

546def _launch_external_detchar(event): 

547 """Launch detchar tasks for an external event. 

548 

549 Parameters 

550 ---------- 

551 event : dict 

552 External event dictionary 

553 

554 Returns 

555 ------- 

556 event : dict 

557 External event dictionary 

558 

559 """ 

560 start = event['gpstime'] 

561 if event['pipeline'] == 'SNEWS': 

562 start, end = event['gpstime'], event['gpstime'] 

563 else: 

564 integration_time = \ 

565 event['extra_attributes']['GRB']['trigger_duration'] or 4.0 

566 end = start + integration_time 

567 detchar.check_vectors.si(event, event['graceid'], start, end).delay() 

568 

569 return event 

570 

571 

572def _relaunch_raven_pipeline_with_skymaps(superevent, ext_event, graceid, 

573 use_superevent_skymap=None): 

574 """Relaunch the RAVEN sky map comparison workflow, include recalculating 

575 the joint FAR with updated sky map info and creating a new combined sky 

576 map. 

577 

578 Parameters 

579 ---------- 

580 superevent : dict 

581 Superevent dictionary 

582 exttrig : dict 

583 External event dictionary 

584 graceid : str 

585 GraceDB ID of event 

586 use_superevent_skymap : bool 

587 If True/False, use/don't use skymap info from superevent. 

588 Else if None, check SKYMAP_READY label in external event. 

589 

590 """ 

591 gw_group = superevent['preferred_event_data']['group'] 

592 tl, th = raven._time_window(graceid, gw_group, 

593 [ext_event['pipeline']], 

594 [ext_event['search']]) 

595 # FIXME: both overlap integral and combined sky map could be 

596 # done by the same function since they are so similar 

597 use_superevent = (use_superevent_skymap 

598 if use_superevent_skymap is not None else 

599 'SKYMAP_READY' in ext_event['labels']) 

600 canvas = raven.raven_pipeline.si( 

601 [ext_event] if 'S' in graceid else [superevent], 

602 graceid, 

603 superevent if 'S' in graceid else ext_event, 

604 tl, th, gw_group, use_superevent_skymap=use_superevent) 

605 # Create new updated combined sky map 

606 canvas |= external_skymaps.create_combined_skymap.si( 

607 superevent['superevent_id'], ext_event['graceid'], 

608 preferred_event=( 

609 None if use_superevent 

610 else superevent['preferred_event'])) 

611 canvas.delay() 

612 

613 

614@app.task(shared=False) 

615def _create_replace_external_event_and_skymap( 

616 events, payload, search, pipeline, 

617 label=None, ext_group='External', notice_date=None, notice_type=None, 

618 skymap=None, skymap_link=None, use_radec=False): 

619 """Either create a new external event or replace an old one if applicable 

620 Then either uploads a given sky map, try to download one given a link, or 

621 create one given coordinates. 

622 

623 Parameters 

624 ---------- 

625 events : list 

626 List of external events sharing the same trigger ID 

627 payload : str 

628 VOEvent of event being considered 

629 search : str 

630 Search of external event 

631 pipeline : str 

632 Pipeline of external evevent 

633 label : list 

634 Label to be uploaded along with external event. If None, removes 

635 'NOT_GRB' label from event 

636 ext_group : str 

637 Group of external event, 'External' or 'Test' 

638 notice_date : str 

639 External event trigger time in ISO format 

640 notice_type : int 

641 GCN notice type integer 

642 skymap : str 

643 Base64 encoded sky map 

644 skymap_link : str 

645 Link to external sky map to be downloaded 

646 use_radec : bool 

647 If True, try to create sky map using given coordinates 

648 

649 """ 

650 skymap_detchar_canvas = () 

651 upload_new_skymap = True 

652 # If previous event, try to append 

653 if events and ext_group == 'External': 

654 assert len(events) == 1, 'Found more than one matching GraceDB entry' 

655 event, = events 

656 graceid = event['graceid'] 

657 # If previous Fermi sky map, check if from official analysis 

658 if 'EXT_SKYMAP_READY' in event['labels'] and \ 

659 event['pipeline'] == 'Fermi': 

660 # If True, block further sky maps from being uploaded automatically 

661 # Note that sky maps can also be uploaded via the dashboard 

662 upload_new_skymap = \ 

663 (external_skymaps.FERMI_OFFICIAL_SKYMAP_FILENAME not in 

664 external_skymaps.get_skymap_filename(graceid, is_gw=False)) 

665 if label: 

666 create_replace_canvas = gracedb.create_label.si(label, graceid) 

667 else: 

668 create_replace_canvas = gracedb.remove_label.si('NOT_GRB', graceid) 

669 

670 # Prevent SubGRBs from appending GRBs, also append if same search 

671 if search == 'GRB' or search == event['search']: 

672 # Replace event and pass already existing event dictionary 

673 create_replace_canvas |= gracedb.replace_event.si(graceid, payload) 

674 create_replace_canvas |= gracedb.get_event.si(graceid) 

675 else: 

676 # If not appending just exit 

677 return 

678 

679 # If new event, create new entry in GraceDB and launch detchar 

680 else: 

681 create_replace_canvas = gracedb.create_event.si( 

682 filecontents=payload, 

683 search=search, 

684 group=ext_group, 

685 pipeline=pipeline, 

686 labels=[label] if label else None) 

687 skymap_detchar_canvas += _launch_external_detchar.s(), 

688 

689 # Use sky map if provided 

690 if skymap: 

691 skymap_detchar_canvas += \ 

692 external_skymaps.read_upload_skymap_from_base64.s(skymap), 

693 # Otherwise if no official Fermi sky map, upload one based on given info 

694 elif upload_new_skymap: 

695 # Otherwise grab sky map from provided link 

696 if skymap_link: 

697 skymap_detchar_canvas += \ 

698 external_skymaps.get_upload_external_skymap.s(skymap_link), 

699 # Otherwise if threshold Fermi try to grab sky map 

700 elif pipeline == 'Fermi' and search == 'GRB': 

701 skymap_detchar_canvas += \ 

702 external_skymaps.get_upload_external_skymap.s(None), 

703 # Otherwise create sky map from given coordinates 

704 if use_radec: 

705 skymap_detchar_canvas += \ 

706 external_skymaps.create_upload_external_skymap.s( 

707 notice_type, notice_date), 

708 

709 ( 

710 create_replace_canvas 

711 | 

712 group(skymap_detchar_canvas) 

713 ).delay() 

714 

715 

716def _kafka_to_voevent(alert): 

717 """Parse an alert sent via Kafka from a MOU partner in our joint 

718 subthreshold targeted search and convert to an equivalent XML string 

719 GCN VOEvent. 

720 

721 Parameters 

722 ---------- 

723 alert : dict 

724 Kafka alert packet 

725 

726 Returns 

727 ------- 

728 payload : str 

729 XML GCN notice alert packet in string format 

730 

731 """ 

732 # Define basic values 

733 pipeline = alert['mission'] 

734 start_time = alert['trigger_time'] 

735 alert_time = alert['alert_datetime'] 

736 far = alert['far'] 

737 duration = alert['rate_duration'] 

738 id = '_'.join(str(x) for x in alert['id']) 

739 # Use central time since starting time is not well measured 

740 central_time = \ 

741 Time(start_time, format='isot', scale='utc').to_value('gps') + \ 

742 .5 * duration 

743 trigger_time = \ 

744 str(Time(central_time, format='gps', scale='utc').isot) + 'Z' 

745 

746 # sky localization may not be available 

747 ra = alert.get('ra') 

748 dec = alert.get('dec') 

749 # Try to get dec first then ra, None if both misssing 

750 error = alert.get('dec_uncertainty') 

751 if error is None: 

752 error = alert.get('ra_uncertainty') 

753 # Argument should be list if not None 

754 if isinstance(error, list): 

755 error = error[0] 

756 # if any missing sky map info, set to zeros so will be ignored later 

757 if ra is None or dec is None or error is None: 

758 ra, dec, error = 0., 0., 0. 

759 

760 # Load template 

761 fname = str(Path(__file__).parent / 

762 '../tests/data/{}_subgrbtargeted_template.xml'.format( 

763 pipeline.lower())) 

764 root = etree.parse(fname) 

765 

766 # Update template values 

767 # Change ivorn to indicate this is a subthreshold targeted event 

768 root.xpath('.')[0].attrib['ivorn'] = \ 

769 'ivo://lvk.internal/{0}#targeted_subthreshold-{1}'.format( 

770 pipeline.lower(), trigger_time).encode() 

771 

772 # Update ID 

773 root.find("./What/Param[@name='TrigID']").attrib['value'] = \ 

774 id.encode() 

775 

776 # Change times to chosen time 

777 root.find("./Who/Date").text = str(alert_time).encode() 

778 root.find(("./WhereWhen/ObsDataLocation/" 

779 "ObservationLocation/AstroCoords/Time/TimeInstant/" 

780 "ISOTime")).text = str(trigger_time).encode() 

781 

782 root.find("./What/Param[@name='FAR']").attrib['value'] = \ 

783 str(far).encode() 

784 

785 root.find("./What/Param[@name='Integ_Time']").attrib['value'] = \ 

786 str(duration).encode() 

787 

788 # Sky position 

789 root.find(("./WhereWhen/ObsDataLocation/" 

790 "ObservationLocation/AstroCoords/Position2D/Value2/" 

791 "C1")).text = str(ra).encode() 

792 root.find(("./WhereWhen/ObsDataLocation/" 

793 "ObservationLocation/AstroCoords/Position2D/Value2/" 

794 "C2")).text = str(dec).encode() 

795 root.find(("./WhereWhen/ObsDataLocation/" 

796 "ObservationLocation/AstroCoords/Position2D/" 

797 "Error2Radius")).text = str(error).encode() 

798 

799 return (etree.tostring(root, xml_declaration=True, encoding="UTF-8", 

800 pretty_print=True), 

801 pipeline, trigger_time.replace('Z', ''), id)