Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1"""Module containing the functionality for creation and management of 

2superevents. 

3 

4* There is serial processing of triggers from low latency pipelines. 

5* Dedicated **superevent** queue for this purpose. 

6* Primary logic to respond to low latency triggers contained in 

7 :meth:`process` function. 

8""" 

9from itertools import filterfalse 

10 

11from celery.utils.log import get_task_logger 

12from ligo.segments import segment, segmentlist 

13 

14from ..import app 

15from . import gracedb, lvalert 

16 

17log = get_task_logger(__name__) 

18 

19REQUIRED_LABELS_BY_GROUP = { 

20 'cbc': {'PASTRO_READY', 'EMBRIGHT_READY', 'SKYMAP_READY'}, 

21 'burst': {'SKYMAP_READY'} 

22} 

23"""These labels should be present on an event to consider it to 

24be complete. 

25""" 

26 

27FROZEN_LABEL = 'EM_Selected' 

28"""This label indicates that the superevent manager should make no further 

29changes to the preferred event.""" 

30 

31READY_LABEL = 'EM_READY' 

32"""This label indicates that a preferred event has been assigned and it 

33has all data products required to make it ready for annotations.""" 

34 

35 

36@lvalert.handler('cbc_gstlal', 

37 'cbc_spiir', 

38 'cbc_pycbc', 

39 'cbc_mbtaonline', 

40 'burst_olib', 

41 'burst_cwb', 

42 shared=False) 

43def handle(payload): 

44 """Respond to lvalert nodes from low-latency search pipelines and delegate 

45 to :meth:`process` for superevent management. 

46 """ 

47 alert_type = payload['alert_type'] 

48 gid = payload['object']['graceid'] 

49 

50 try: 

51 far = payload['object']['far'] 

52 except KeyError: 

53 log.info('Skipping %s because it lacks a FAR', gid) 

54 return 

55 

56 if far > app.conf['superevent_far_threshold']: 

57 log.info("Skipping processing of %s because of high FAR", gid) 

58 return 

59 

60 priority = 1 

61 if alert_type == 'label_added': 

62 priority = 0 

63 label = payload['data']['name'] 

64 group = payload['object']['group'].lower() 

65 if label == 'RAVEN_ALERT': 

66 log.info('Label %s added to %s', label, gid) 

67 elif label not in REQUIRED_LABELS_BY_GROUP[group]: 

68 return 

69 elif not is_complete(payload['object']): 

70 log.info("Ignoring since %s has %s labels. " 

71 "It is not complete", gid, payload['object']['labels']) 

72 return 

73 elif alert_type != 'new': 

74 return 

75 

76 process.si(payload).apply_async(priority=priority) 

77 

78 

79@gracedb.task(queue='superevent', shared=False) 

80@gracedb.catch_retryable_http_errors 

81def process(payload): 

82 """ 

83 Respond to `payload` and serially processes them to create new superevents, 

84 add events to existing ones. 

85 

86 Parameters 

87 ---------- 

88 payload : dict 

89 LVAlert payload 

90 

91 """ 

92 event_info = payload['object'] 

93 gid = event_info['graceid'] 

94 category = get_category(event_info) 

95 t_0, t_start, t_end = get_ts(event_info) 

96 

97 if event_info.get('superevent'): 

98 sid = event_info['superevent'] 

99 log.info('Event %s already belongs to superevent %s', gid, sid) 

100 s = gracedb.get_superevent(sid) 

101 superevent = _SuperEvent(s['t_start'], 

102 s['t_end'], 

103 s['t_0'], 

104 s['superevent_id'], 

105 s['preferred_event'], s) 

106 _update_superevent(superevent, 

107 event_info, 

108 t_0=t_0, 

109 t_start=None, 

110 t_end=None) 

111 else: # not event_info.get('superevent') 

112 log.info('Event %s does not yet belong to a superevent', gid) 

113 superevents = gracedb.get_superevents('category: {} {} .. {}'.format( 

114 category, 

115 event_info['gpstime'] - app.conf['superevent_query_d_t_start'], 

116 event_info['gpstime'] + app.conf['superevent_query_d_t_end'])) 

117 

118 for s in superevents: 

119 if gid in s['gw_events']: 

120 sid = s['superevent_id'] 

121 log.info('Event %s found assigned to superevent %s', gid, sid) 

122 if payload['alert_type'] == 'label_added': 

123 log.info('Label %s added to %s', 

124 payload['data']['name'], gid) 

125 elif payload['alert_type'] == 'new': 

126 log.info('New type LVAlert for %s with ' 

127 'existing superevent %s. ' 

128 'No action required', gid, sid) 

129 return 

130 superevent = _SuperEvent(s['t_start'], 

131 s['t_end'], 

132 s['t_0'], 

133 s['superevent_id'], 

134 s['preferred_event'], s) 

135 _update_superevent(superevent, 

136 event_info, 

137 t_0=t_0, 

138 t_start=None, 

139 t_end=None) 

140 break 

141 else: # s not in superevents 

142 event_segment = _Event(t_0, t_start, t_end, 

143 event_info['graceid'], 

144 event_info['group'], 

145 event_info['pipeline'], 

146 event_info.get('search'), 

147 event_dict=event_info) 

148 

149 superevent = _partially_intersects(superevents, event_segment) 

150 

151 if superevent: 

152 sid = superevent.superevent_id 

153 log.info('Event %s in window of %s. ' 

154 'Adding event to superevent', gid, sid) 

155 gracedb.add_event_to_superevent(sid, event_segment.gid) 

156 # extend the time window of the superevent 

157 new_superevent = superevent | event_segment 

158 if new_superevent != superevent: 

159 log.info('%s not completely contained in %s, ' 

160 'extending superevent window', 

161 event_segment.gid, sid) 

162 new_t_start, new_t_end = new_superevent 

163 

164 else: # new_superevent == superevent 

165 log.info('%s is completely contained in %s', 

166 event_segment.gid, sid) 

167 new_t_start = new_t_end = None 

168 _update_superevent(superevent, 

169 event_info, 

170 t_0=t_0, 

171 t_start=new_t_start, 

172 t_end=new_t_end) 

173 else: # not superevent 

174 log.info('New event %s with no superevent in GraceDB, ' 

175 'creating new superevent', gid) 

176 sid = gracedb.create_superevent(event_info['graceid'], 

177 t_0, t_start, t_end) 

178 

179 if should_publish(event_info): 

180 gracedb.create_label.delay('ADVREQ', sid) 

181 if is_complete(event_info): 

182 if app.conf['preliminary_alert_timeout'] \ 

183 and 'EARLY_WARNING' not in event_info['labels']: 

184 gracedb.create_label.s(FROZEN_LABEL, sid).set( 

185 queue='superevent', 

186 countdown=app.conf['preliminary_alert_timeout'] 

187 ).delay() 

188 else: # fast path if no countdown 

189 gracedb.create_label(FROZEN_LABEL, sid) 

190 

191 

192def get_category(event): 

193 """Get the superevent category for an event. 

194 

195 Parameters 

196 ---------- 

197 event : dict 

198 Event dictionary (e.g., the return value from 

199 :meth:`gwcelery.tasks.gracedb.get_event`). 

200 

201 Returns 

202 ------- 

203 {'mdc', 'test', 'production'} 

204 

205 """ 

206 if event.get('search') == 'MDC': 

207 return 'mdc' 

208 elif event['group'] == 'Test': 

209 return 'test' 

210 else: 

211 return 'production' 

212 

213 

214def get_ts(event): 

215 """Get time extent of an event, depending on pipeline-specific parameters. 

216 

217 * For CWB, use the event's ``duration`` field. 

218 * For oLIB, use the ratio of the event's ``quality_mean`` and 

219 ``frequency_mean`` fields. 

220 * For all other pipelines, use the 

221 :obj:`~gwcelery.conf.superevent_d_t_start` and 

222 :obj:`~gwcelery.conf.superevent_d_t_end` configuration values. 

223 

224 Parameters 

225 ---------- 

226 event : dict 

227 Event dictionary (e.g., the return value from 

228 :meth:`gwcelery.tasks.gracedb.get_event`). 

229 

230 Returns 

231 ------- 

232 t_0: float 

233 Segment center time in GPS seconds. 

234 t_start : float 

235 Segment start time in GPS seconds. 

236 

237 t_end : float 

238 Segment end time in GPS seconds. 

239 

240 """ 

241 pipeline = event['pipeline'].lower() 

242 if pipeline == 'cwb': 

243 attribs = event['extra_attributes']['MultiBurst'] 

244 d_t_start = d_t_end = attribs['duration'] 

245 elif pipeline == 'olib': 

246 attribs = event['extra_attributes']['LalInferenceBurst'] 

247 d_t_start = d_t_end = (attribs['quality_mean'] / 

248 attribs['frequency_mean']) 

249 else: 

250 d_t_start = app.conf['superevent_d_t_start'].get( 

251 pipeline, app.conf['superevent_default_d_t_start']) 

252 d_t_end = app.conf['superevent_d_t_end'].get( 

253 pipeline, app.conf['superevent_default_d_t_end']) 

254 return (event['gpstime'], event['gpstime'] - d_t_start, 

255 event['gpstime'] + d_t_end) 

256 

257 

258def get_snr(event): 

259 """Get the SNR from the LVAlert packet. 

260 

261 Different groups and pipelines store the SNR in different fields. 

262 

263 Parameters 

264 ---------- 

265 event : dict 

266 Event dictionary (e.g., the return value from 

267 :meth:`gwcelery.tasks.gracedb.get_event`). 

268 

269 Returns 

270 ------- 

271 snr : float 

272 The SNR. 

273 

274 """ 

275 group = event['group'].lower() 

276 pipeline = event['pipeline'].lower() 

277 if group == 'cbc': 

278 attribs = event['extra_attributes']['CoincInspiral'] 

279 return attribs['snr'] 

280 elif pipeline == 'cwb': 

281 attribs = event['extra_attributes']['MultiBurst'] 

282 return attribs['snr'] 

283 elif pipeline == 'olib': 

284 attribs = event['extra_attributes']['LalInferenceBurst'] 

285 return attribs['omicron_snr_network'] 

286 else: 

287 raise NotImplementedError('SNR attribute not found') 

288 

289 

290def get_instruments(event): 

291 """Get the instruments that contributed data to an event. 

292 

293 Parameters 

294 ---------- 

295 event : dict 

296 Event dictionary (e.g., the return value from 

297 :meth:`gwcelery.tasks.gracedb.get_event`). 

298 

299 Returns 

300 ------- 

301 set 

302 The set of instruments that contributed to the event. 

303 

304 """ 

305 attribs = event['extra_attributes']['SingleInspiral'] 

306 ifos = {single['ifo'] for single in attribs} 

307 return ifos 

308 

309 

310def get_instruments_in_ranking_statistic(event): 

311 """Get the instruments that contribute to the false alarm rate. 

312 

313 Parameters 

314 ---------- 

315 event : dict 

316 Event dictionary (e.g., the return value from 

317 :meth:`gwcelery.tasks.gracedb.get_event`). 

318 

319 Returns 

320 ------- 

321 set 

322 The set of instruments that contributed to the ranking statistic for 

323 the event. 

324 

325 Notes 

326 ----- 

327 The number of instruments that contributed *data* to an event is given by 

328 the ``instruments`` key of the GraceDB event JSON structure. However, some 

329 pipelines (e.g. gstlal) have a distinction between which instruments 

330 contributed *data* and which were considered in the *ranking* of the 

331 candidate. For such pipelines, we infer which pipelines contributed to the 

332 ranking by counting only the SingleInspiral records for which the chi 

333 squared field is non-empty. 

334 

335 For PyCBC Live in the O3 configuration, an empty chi^2 field does not mean 

336 that the detector did not contribute to the ranking; in fact, *all* 

337 detectors listed in the SingleInspiral table contribute to the significance 

338 even if the chi^2 is not computed for some of them. Hence PyCBC Live is 

339 handled as a special case. 

340 

341 """ 

342 if event['pipeline'].lower() == 'pycbc': 

343 return set(event['instruments'].split(',')) 

344 else: 

345 attribs = event['extra_attributes']['SingleInspiral'] 

346 return {single['ifo'] for single in attribs 

347 if single.get('chisq') is not None} 

348 

349 

350@app.task(shared=False) 

351def select_preferred_event(events): 

352 """Select the preferred event out of a list of G events, typically 

353 contents of a superevent, based on :meth:`keyfunc`. 

354 

355 Parameters 

356 ---------- 

357 events : list 

358 list of event dictionaries 

359 

360 """ 

361 # FIXME: Requires robust determination of an External event 

362 g_events = list( 

363 filterfalse(lambda x: x['graceid'].startswith('E'), events)) 

364 return max(g_events, key=keyfunc) 

365 

366 

367def is_complete(event): 

368 """ 

369 Determine if a G event is complete in the sense of the event 

370 has its data products complete i.e. has PASTRO_READY, SKYMAP_READY, 

371 EMBRIGHT_READY for CBC events and the SKYMAP_READY label for the 

372 Burst events. Test events are not processed by low-latency infrastructure 

373 and are always labeled complete. 

374 

375 Parameters 

376 ---------- 

377 event : dict 

378 Event dictionary (e.g., the return value from 

379 :meth:`gwcelery.tasks.gracedb.get_event`). 

380 

381 """ 

382 group = event['group'].lower() 

383 label_set = set(event['labels']) 

384 required_labels = REQUIRED_LABELS_BY_GROUP[group] 

385 return required_labels.issubset(label_set) 

386 

387 

388def should_publish(event): 

389 """Determine whether an event should be published as a public alert. 

390 

391 All of the following conditions must be true for a public alert: 

392 

393 * The event's ``offline`` flag is not set. 

394 * The event's false alarm rate, weighted by the group-specific trials 

395 factor as specified by the 

396 :obj:`~gwcelery.conf.preliminary_alert_trials_factor` configuration 

397 setting, is less than or equal to 

398 :obj:`~gwcelery.conf.preliminary_alert_far_threshold`. 

399 

400 Parameters 

401 ---------- 

402 event : dict 

403 Event dictionary (e.g., the return value from 

404 :meth:`gwcelery.tasks.gracedb.get_event`). 

405 

406 Returns 

407 ------- 

408 should_publish : bool 

409 :obj:`True` if the event meets the criteria for a public alert or 

410 :obj:`False` if it does not. 

411 

412 """ 

413 return all(_should_publish(event)) 

414 

415 

416def _should_publish(event): 

417 """Wrapper around :meth:`should_publish`. Returns the boolean returns of 

418 the publishability criteria as a tuple for later use. 

419 """ 

420 group = event['group'].lower() 

421 if 'EARLY_WARNING' in event['labels']: 

422 far_threshold = app.conf['early_warning_alert_far_threshold'] 

423 trials_factor = app.conf['early_warning_alert_trials_factor'] 

424 else: 

425 far_threshold = app.conf['preliminary_alert_far_threshold'][group] 

426 trials_factor = app.conf['preliminary_alert_trials_factor'][group] 

427 far = trials_factor * event['far'] 

428 raven_coincidence = ('RAVEN_ALERT' in event['labels']) 

429 

430 return (not event['offline'] and 'INJ' not in event['labels'], 

431 far <= far_threshold or raven_coincidence) 

432 

433 

434def keyfunc(event): 

435 """Key function for selection of the preferred event. 

436 

437 Return a value suitable for identifying the preferred event. Given events 

438 ``a`` and ``b``, ``a`` is preferred over ``b`` if 

439 ``keyfunc(a) > keyfunc(b)``, else ``b`` is preferred. 

440 

441 Parameters 

442 ---------- 

443 event : dict 

444 Event dictionary (e.g., the return value from 

445 :meth:`gwcelery.tasks.gracedb.get_event`). 

446 

447 Returns 

448 ------- 

449 key : tuple 

450 The comparison key. 

451 

452 Notes 

453 ----- 

454 Tuples are compared lexicographically in Python: they are compared 

455 element-wise until an unequal pair of elements is found. 

456 

457 """ 

458 group = event['group'].lower() 

459 try: 

460 group_rank = ['burst', 'cbc'].index(group) 

461 except ValueError: 

462 group_rank = -1 

463 

464 if group == 'cbc': 

465 group_rank = 1 

466 n_ifos = len(get_instruments(event)) 

467 significance = get_snr(event) 

468 else: 

469 # We don't care about the number of detectors for burst events. 

470 n_ifos = -1 

471 # Smaller FAR -> higher IFAR -> more significant. 

472 # Use -FAR instead of IFAR=1/FAR so that rank for FAR=0 is defined. 

473 significance = -event['far'] 

474 

475 return (is_complete(event), *_should_publish(event), group_rank, n_ifos, 

476 significance) 

477 

478 

479def _update_superevent(superevent, new_event_dict, 

480 t_0, t_start, t_end): 

481 """Update preferred event and/or change time window. Events with multiple 

482 detectors take precedence over single-detector events, then CBC events take 

483 precedence over burst events, and any remaining tie is broken by SNR/FAR 

484 values for CBC/Burst. Single detector are not promoted to preferred event 

485 status, if existing preferred event is multi-detector 

486 

487 Parameters 

488 ---------- 

489 superevent : object 

490 instance of :class:`_SuperEvent` 

491 new_event_dict : dict 

492 event info of the new trigger as a dictionary 

493 t_0 : float 

494 center time of `superevent_id`, None for no change 

495 t_start : float 

496 start time of `superevent_id`, None for no change 

497 t_end : float 

498 end time of `superevent_id`, None for no change 

499 

500 """ 

501 superevent_id = superevent.superevent_id 

502 preferred_event = superevent.preferred_event 

503 

504 kwargs = {} 

505 if t_start is not None: 

506 kwargs['t_start'] = t_start 

507 if t_end is not None: 

508 kwargs['t_end'] = t_end 

509 if FROZEN_LABEL not in superevent.event_dict['labels']: 

510 preferred_event_dict = gracedb.get_event(preferred_event) 

511 if keyfunc(new_event_dict) > keyfunc(preferred_event_dict): 

512 # update preferred event when EM_Selected is not applied 

513 kwargs['t_0'] = t_0 

514 kwargs['preferred_event'] = new_event_dict['graceid'] 

515 

516 if kwargs: 

517 gracedb.update_superevent(superevent_id, **kwargs) 

518 # completeness takes first precedence in deciding preferred event 

519 # necessary and suffiecient condition to superevent as ready 

520 if is_complete(new_event_dict): 

521 gracedb.create_label.delay(READY_LABEL, superevent_id) 

522 

523 

524def _superevent_segment_list(superevents): 

525 """Ingests a list of superevent dictionaries, and returns a segmentlist 

526 with start and end times as the duration of each segment. 

527 

528 Parameters 

529 ---------- 

530 superevents : list 

531 List of superevent dictionaries (e.g., the return value from 

532 :meth:`gwcelery.tasks.gracedb.get_superevents`). 

533 

534 Returns 

535 ------- 

536 superevent_list : segmentlist 

537 superevents as a segmentlist object 

538 

539 """ 

540 return segmentlist([_SuperEvent(s.get('t_start'), 

541 s.get('t_end'), 

542 s.get('t_0'), 

543 s.get('superevent_id'), 

544 s.get('preferred_event'), 

545 s) 

546 for s in superevents]) 

547 

548 

549def _partially_intersects(superevents, event_segment): 

550 """Similar to :meth:`segmentlist.find` except it also returns the segment 

551 of `superevents` which partially intersects argument. If there are more 

552 than one intersections, first occurence is returned. 

553 

554 Parameters 

555 ---------- 

556 superevents : list 

557 list pulled down using the gracedb client 

558 :method:`superevents` 

559 event_segment : segment 

560 segment object whose index is wanted 

561 

562 Returns 

563 ------- 

564 match_segment : segment 

565 segment in `self` which intersects. `None` if not found 

566 

567 """ 

568 # create a segmentlist using start and end times 

569 superevents = _superevent_segment_list(superevents) 

570 for s in superevents: 

571 if s.intersects(event_segment): 

572 return s 

573 return None 

574 

575 

576class _Event(segment): 

577 """An event implemented as an extension of :class:`segment`.""" 

578 

579 def __new__(cls, t0, t_start, t_end, *args, **kwargs): 

580 return super().__new__(cls, t_start, t_end) 

581 

582 def __init__(self, t0, t_start, t_end, gid, group=None, pipeline=None, 

583 search=None, event_dict={}): 

584 self.t0 = t0 

585 self.gid = gid 

586 self.group = group 

587 self.pipeline = pipeline 

588 self.search = search 

589 self.event_dict = event_dict 

590 

591 

592class _SuperEvent(segment): 

593 """An superevent implemented as an extension of :class:`segment`.""" 

594 

595 def __new__(cls, t_start, t_end, *args, **kwargs): 

596 return super().__new__(cls, t_start, t_end) 

597 

598 def __init__(self, t_start, t_end, t_0, sid, 

599 preferred_event=None, event_dict={}): 

600 self.t_start = t_start 

601 self.t_end = t_end 

602 self.t_0 = t_0 

603 self.superevent_id = sid 

604 self.preferred_event = preferred_event 

605 self.event_dict = event_dict