Coverage for gwcelery/tasks/superevents.py: 87%
241 statements
« prev ^ index » next coverage.py v7.4.2, created at 2024-03-28 17:21 +0000
« prev ^ index » next coverage.py v7.4.2, created at 2024-03-28 17:21 +0000
1"""Module containing the functionality for creation and management of
2superevents.
4* There is serial processing of triggers from low latency pipelines.
5* Dedicated **superevent** queue for this purpose.
6* Primary logic to respond to low latency triggers contained in
7 :meth:`process` function.
8"""
9from itertools import filterfalse, groupby
11from celery.utils.log import get_task_logger
12from ligo.segments import segment, segmentlist
14from .. import app
15from . import gracedb, igwn_alert
17log = get_task_logger(__name__)
19REQUIRED_LABELS_BY_GROUP = {
20 'cbc': {'PASTRO_READY', 'EMBRIGHT_READY', 'SKYMAP_READY'},
21 'burst': {'SKYMAP_READY'}
22}
23"""These labels should be present on an event to consider it to
24be complete.
25"""
27FROZEN_LABEL = 'LOW_SIGNIF_LOCKED'
28"""This label indicates that the superevent manager should make no further
29changes to the preferred event."""
31SIGNIFICANT_LABEL = 'SIGNIF_LOCKED'
32"""This label indicates that the superevent is elevated to significant"""
34EARLY_WARNING_LABEL = 'EARLY_WARNING'
35"""This label indicates that the superevent contains a significant
36early warning event."""
38EARLY_WARNING_SEARCH_NAME = 'EarlyWarning'
39"""Search name for Early Warning searches. Only significant events
40result in consideration by the superevent manager."""
42SUBSOLAR_SEARCH_NAME = 'SSM'
43"""Search name for subsolar mass search. These are ignored by
44the superevent manager."""
46READY_LABEL = 'EM_READY'
47"""This label indicates that a preferred event has been assigned and it
48has all data products required to make it ready for annotations."""
50VT_SEARCH_NAME = 'VTInjection'
51"""Search name for events uploaded as a part of estimating the spacetime
52sensitive volume. Events from this search do not form superevents, and
53are not annotated."""
56@igwn_alert.handler('cbc_gstlal',
57 'cbc_spiir',
58 'cbc_pycbc',
59 'cbc_mbta',
60 'burst_olib',
61 'burst_cwb',
62 'burst_mly',
63 shared=False)
64def handle(payload):
65 """Respond to IGWN alert topics from low-latency search pipelines and
66 delegate to :meth:`process` for superevent management.
67 """
68 alert_type = payload['alert_type']
69 gid = payload['object']['graceid']
70 alert_group = payload['object']['group'].lower()
71 alert_search = payload['object']['search']
73 ifos = get_instruments(payload['object']) if alert_group == 'cbc' \
74 else payload['object']['instruments'].split(',')
75 # Ignore inclusion of events involving KAGRA; revert when policy is changed
76 if "K1" in ifos:
77 log.info('Skipping %s because it involves KAGRA data', gid)
78 return
79 # Ignore inclusion of events from VT search
80 if alert_search == VT_SEARCH_NAME:
81 log.info("Skipping {} event {}".format(VT_SEARCH_NAME, gid))
82 return
84 try:
85 far = payload['object']['far']
86 except KeyError:
87 log.info('Skipping %s because it lacks a FAR', gid)
88 return
90 if far > app.conf['superevent_far_threshold']:
91 log.info("Skipping processing of %s because of high FAR", gid)
92 return
94 priority = 1
95 if alert_type == 'label_added':
96 priority = 0
97 label = payload['data']['name']
98 group = payload['object']['group'].lower()
99 search = payload['object']['search'].lower()
100 pipeline = payload['object']['pipeline'].lower()
101 # Special case of cWB-BBH that require the same label of CBC
102 if search == 'bbh' and pipeline == 'cwb':
103 group = 'cbc'
104 if label == 'RAVEN_ALERT':
105 log.info('Label %s added to %s', label, gid)
106 elif label not in REQUIRED_LABELS_BY_GROUP[group]:
107 return
108 elif not is_complete(payload['object']):
109 log.info("Ignoring since %s has %s labels. "
110 "It is not complete", gid, payload['object']['labels'])
111 return
112 elif alert_type != 'new':
113 return
114 process.si(payload).apply_async(priority=priority)
117@gracedb.task(queue='superevent', shared=False, time_limit=600)
118@gracedb.catch_retryable_http_errors
119def process(payload):
120 """
121 Respond to `payload` and serially processes them to create new superevents,
122 add events to existing ones.
124 Parameters
125 ----------
126 payload : dict
127 IGWN alert payload
129 """
130 event_info = payload['object']
131 gid = event_info['graceid']
132 category = get_category(event_info)
133 t_0, t_start, t_end = get_ts(event_info)
135 if event_info.get('superevent'):
136 sid = event_info['superevent']
137 log.info('Event %s already belongs to superevent %s', gid, sid)
138 # superevent_neighbours has current and nearby superevents
139 s = event_info['superevent_neighbours'][sid]
140 superevent = _SuperEvent(s['t_start'],
141 s['t_end'],
142 s['t_0'],
143 s['superevent_id'])
144 _update_superevent(superevent.superevent_id,
145 event_info,
146 t_0=t_0,
147 t_start=None,
148 t_end=None)
149 else:
150 log.info('Event %s does not yet belong to a superevent', gid)
151 superevents = gracedb.get_superevents('category: {} {} .. {}'.format(
152 category,
153 event_info['gpstime'] - app.conf['superevent_query_d_t_start'],
154 event_info['gpstime'] + app.conf['superevent_query_d_t_end']))
155 for s in superevents:
156 if gid in s['gw_events']:
157 sid = s['superevent_id']
158 log.info('Event %s found assigned to superevent %s', gid, sid)
159 if payload['alert_type'] == 'label_added':
160 log.info('Label %s added to %s',
161 payload['data']['name'], gid)
162 elif payload['alert_type'] == 'new':
163 log.info('new alert type for %s with '
164 'existing superevent %s. '
165 'No action required', gid, sid)
166 return
167 superevent = _SuperEvent(s['t_start'],
168 s['t_end'],
169 s['t_0'],
170 s['superevent_id'])
171 _update_superevent(superevent.superevent_id,
172 event_info,
173 t_0=t_0,
174 t_start=None,
175 t_end=None)
176 break
177 else: # s not in superevents
178 event_segment = _Event(t_start, t_end, t_0, event_info['graceid'])
180 superevent = _partially_intersects(superevents,
181 event_segment)
183 if superevent:
184 sid = superevent.superevent_id
185 log.info('Event %s in window of %s. '
186 'Adding event to superevent', gid, sid)
187 gracedb.add_event_to_superevent(sid, event_segment.gid)
188 # extend the time window of the superevent
189 new_superevent = superevent | event_segment
190 if new_superevent != superevent:
191 log.info('%s not completely contained in %s, '
192 'extending superevent window',
193 event_segment.gid, sid)
194 new_t_start, new_t_end = new_superevent
196 else: # new_superevent == superevent
197 log.info('%s is completely contained in %s',
198 event_segment.gid, sid)
199 new_t_start = new_t_end = None
200 _update_superevent(superevent.superevent_id,
201 event_info,
202 t_0=t_0,
203 t_start=new_t_start,
204 t_end=new_t_end)
205 else: # not superevent
206 log.info('New event %s with no superevent in GraceDB, '
207 'creating new superevent', gid)
208 sid = gracedb.create_superevent(event_info['graceid'],
209 t_0, t_start, t_end)
211 if should_publish(event_info, significant=True):
212 gracedb.create_label.delay('ADVREQ', sid)
213 if is_complete(event_info):
214 if EARLY_WARNING_LABEL in event_info['labels']:
215 gracedb.create_label(EARLY_WARNING_LABEL, sid)
216 else:
217 gracedb.create_label(SIGNIFICANT_LABEL, sid)
219 if should_publish(event_info, significant=False):
220 if is_complete(event_info):
221 gracedb.create_label(FROZEN_LABEL, sid)
224def get_category(event):
225 """Get the superevent category for an event.
227 Parameters
228 ----------
229 event : dict
230 Event dictionary (e.g., the return value from
231 :meth:`gwcelery.tasks.gracedb.get_event`).
233 Returns
234 -------
235 {'mdc', 'test', 'production'}
237 """
238 if event.get('search') == 'MDC':
239 return 'mdc'
240 elif event['group'] == 'Test':
241 return 'test'
242 else:
243 return 'production'
246def get_ts(event):
247 """Get time extent of an event, depending on pipeline-specific parameters.
249 * For CWB and MLy, use the event's ``duration`` field.
250 * For oLIB, use the ratio of the event's ``quality_mean`` and
251 ``frequency_mean`` fields.
252 * For all other pipelines, use the
253 :obj:`~gwcelery.conf.superevent_d_t_start` and
254 :obj:`~gwcelery.conf.superevent_d_t_end` configuration values.
256 Parameters
257 ----------
258 event : dict
259 Event dictionary (e.g., the return value from
260 :meth:`gwcelery.tasks.gracedb.get_event` or
261 ``preferred_event_data`` in igwn-alert packet.)
263 Returns
264 -------
265 t_0: float
266 Segment center time in GPS seconds.
267 t_start : float
268 Segment start time in GPS seconds.
270 t_end : float
271 Segment end time in GPS seconds.
273 """
274 pipeline = event['pipeline'].lower()
275 if pipeline == 'cwb':
276 attribs = event['extra_attributes']['MultiBurst']
277 d_t_start = d_t_end = attribs['duration']
278 elif pipeline == 'olib':
279 attribs = event['extra_attributes']['LalInferenceBurst']
280 d_t_start = d_t_end = (attribs['quality_mean'] /
281 attribs['frequency_mean'])
282 elif pipeline == 'mly':
283 attribs = event['extra_attributes']['MLyBurst']
284 d_t_start = d_t_end = attribs['duration']
285 else:
286 d_t_start = app.conf['superevent_d_t_start'].get(
287 pipeline, app.conf['superevent_default_d_t_start'])
288 d_t_end = app.conf['superevent_d_t_end'].get(
289 pipeline, app.conf['superevent_default_d_t_end'])
290 return (event['gpstime'], event['gpstime'] - d_t_start,
291 event['gpstime'] + d_t_end)
294def get_snr(event):
295 """Get the SNR from the LVAlert packet.
297 Different groups and pipelines store the SNR in different fields.
299 Parameters
300 ----------
301 event : dict
302 Event dictionary (e.g., the return value from
303 :meth:`gwcelery.tasks.gracedb.get_event`, or
304 ``preferred_event_data`` in igwn-alert packet.)
306 Returns
307 -------
308 snr : float
309 The SNR.
311 """
312 group = event['group'].lower()
313 pipeline = event['pipeline'].lower()
314 if group == 'cbc':
315 attribs = event['extra_attributes']['CoincInspiral']
316 return attribs['snr']
317 elif pipeline == 'cwb':
318 attribs = event['extra_attributes']['MultiBurst']
319 return attribs['snr']
320 elif pipeline == 'olib':
321 attribs = event['extra_attributes']['LalInferenceBurst']
322 return attribs['omicron_snr_network']
323 elif pipeline == 'mly':
324 attribs = event['extra_attributes']['MLyBurst']
325 return attribs['SNR']
326 else:
327 raise NotImplementedError('SNR attribute not found')
330def get_instruments(event):
331 """Get the instruments that contributed data to an event.
333 Parameters
334 ----------
335 event : dict
336 Event dictionary (e.g., the return value from
337 :meth:`gwcelery.tasks.gracedb.get_event`, or
338 ``preferred_event_data`` in igwn-alert packet.)
340 Returns
341 -------
342 set
343 The set of instruments that contributed to the event.
345 """
346 attribs = event['extra_attributes']['SingleInspiral']
347 ifos = {single['ifo'] for single in attribs}
348 return ifos
351def get_instruments_in_ranking_statistic(event):
352 """Get the instruments that contribute to the false alarm rate.
354 Parameters
355 ----------
356 event : dict
357 Event dictionary (e.g., the return value from
358 :meth:`gwcelery.tasks.gracedb.get_event`, or
359 ``preferred_event_data`` in igwn-alert packet.)
361 Returns
362 -------
363 set
364 The set of instruments that contributed to the ranking statistic for
365 the event.
367 Notes
368 -----
369 The number of instruments that contributed *data* to an event is given by
370 the ``instruments`` key of the GraceDB event JSON structure. However, some
371 pipelines (e.g. gstlal) have a distinction between which instruments
372 contributed *data* and which were considered in the *ranking* of the
373 candidate. For such pipelines, we infer which pipelines contributed to the
374 ranking by counting only the SingleInspiral records for which the chi
375 squared field is non-empty.
377 For PyCBC Live in the O3 configuration, an empty chi^2 field does not mean
378 that the detector did not contribute to the ranking; in fact, *all*
379 detectors listed in the SingleInspiral table contribute to the significance
380 even if the chi^2 is not computed for some of them. Hence PyCBC Live is
381 handled as a special case.
383 """
384 if event['pipeline'].lower() == 'pycbc':
385 return set(event['instruments'].split(','))
386 else:
387 attribs = event['extra_attributes']['SingleInspiral']
388 return {single['ifo'] for single in attribs
389 if single.get('chisq') is not None}
392@app.task(shared=False)
393def select_pipeline_preferred_event(events):
394 """Group list of G events by pipeline, and apply :meth:`keyfunc`
395 to select the pipeline preferred events.
397 Parameters
398 ----------
399 events : list
400 list of event dictionaries
402 Returns
403 -------
404 dict
405 pipeline, graceid pairs
406 """
407 g_events = list(
408 filterfalse(lambda x: x['group'] == "External", events))
409 g_events_by_pipeline = groupby(
410 sorted(g_events, key=lambda x: x['pipeline']),
411 key=lambda x: x['pipeline']
412 )
414 return dict(
415 (k, select_preferred_event(g)) for k, g in g_events_by_pipeline)
418@app.task(shared=False)
419def select_preferred_event(events):
420 """Select the preferred event out of a list of G events, typically
421 contents of a superevent, based on :meth:`keyfunc`.
423 Parameters
424 ----------
425 events : list
426 list of event dictionaries
428 """
429 g_events = list(
430 filterfalse(lambda x: x['group'] == "External", events))
431 return max(g_events, key=keyfunc)
434def is_complete(event):
435 """
436 Determine if a G event is complete in the sense of the event
437 has its data products complete i.e. has PASTRO_READY, SKYMAP_READY,
438 EMBRIGHT_READY for CBC events and the SKYMAP_READY label for the
439 Burst events. Test events are not processed by low-latency infrastructure
440 and are always labeled complete.
442 Parameters
443 ----------
444 event : dict
445 Event dictionary (e.g., the return value from
446 :meth:`gwcelery.tasks.gracedb.get_event`, or
447 ``preferred_event_data`` in igwn-alert packet.)
449 """
450 group = event['group'].lower()
451 pipeline = event['pipeline'].lower()
452 search = event['search'].lower()
453 label_set = set(event['labels'])
454 # Define the special case burst-cwb-bbh that is a CBC
455 if pipeline == 'cwb' and search == 'bbh':
456 group = 'cbc'
458 required_labels = REQUIRED_LABELS_BY_GROUP[group]
459 return required_labels.issubset(label_set)
462def should_publish(event, significant=True):
463 """Determine whether an event should be published as a public alert.
465 All of the following conditions must be true for a public alert:
467 * The event's ``offline`` flag is not set.
468 * The event's false alarm rate, weighted by the group-specific trials
469 factor as specified by the
470 :obj:`~gwcelery.conf.preliminary_alert_trials_factor`
471 (:obj:`~gwcelery.conf.significant_alert_trials_factor`)
472 configuration setting, is less than or equal to
473 :obj:`~gwcelery.conf.preliminary_alert_far_threshold`
474 (:obj:`~gwcelery.conf.significant_alert_far_threshold`)
476 Parameters
477 ----------
478 event : dict
479 Event dictionary (e.g., the return value from
480 :meth:`gwcelery.tasks.gracedb.get_event`, or
481 ``preferred_event_data`` in igwn-alert packet.)
482 significant : bool
483 Flag to use significant
484 (:obj:`~gwcelery.conf.significant_alert_far_threshold`),
485 or less-significant
486 (:obj:`~gwcelery.conf.preliminary_alert_far_threshold`)
487 FAR threshold.
489 Returns
490 -------
491 should_publish : bool
492 :obj:`True` if the event meets the criteria for a public alert or
493 :obj:`False` if it does not.
495 """
496 return all(_should_publish(event, significant=significant))
499def _should_publish(event, significant=False):
500 """Wrapper around :meth:`should_publish`. Returns the boolean returns of
501 the publishability criteria as a tuple for later use.
502 """
503 group = event['group'].lower()
504 search = event.get('search', '').lower()
505 if search in app.conf['significant_alert_far_threshold'][group]:
506 low_signif_far_threshold = \
507 app.conf['preliminary_alert_far_threshold'][group][search]
508 low_signif_trials_factor = \
509 app.conf['preliminary_alert_trials_factor'][group][search]
510 signif_far_threshold = \
511 app.conf['significant_alert_far_threshold'][group][search]
512 signif_trials_factor = \
513 app.conf['significant_alert_trials_factor'][group][search]
515 low_signif_far = low_signif_trials_factor * event['far']
516 signif_far = signif_trials_factor * event['far']
517 else:
518 # Fallback in case an event is uploaded to an unlisted search
519 low_signif_far = -1 * float('inf')
520 signif_far = -1 * float('inf')
522 raven_coincidence = ('RAVEN_ALERT' in event['labels'])
524 # Ensure that anything that returns True for significant=True also returns
525 # True for significant=False. For example, a significant EarlyWarning event
526 # should return True for both significant=True and significant=False.
527 if significant or signif_far < signif_far_threshold:
528 far = signif_far
529 far_threshold = signif_far_threshold
530 else:
531 far = low_signif_far
532 far_threshold = low_signif_far_threshold
534 return (not event['offline'] and 'INJ' not in event['labels'],
535 far <= far_threshold or raven_coincidence)
538def keyfunc(event):
539 """Key function for selection of the preferred event.
541 Return a value suitable for identifying the preferred event. Given events
542 ``a`` and ``b``, ``a`` is preferred over ``b`` if
543 ``keyfunc(a) > keyfunc(b)``, else ``b`` is preferred.
545 Parameters
546 ----------
547 event : dict
548 Event dictionary (e.g., the return value from
549 :meth:`gwcelery.tasks.gracedb.get_event`).
551 Returns
552 -------
553 key : tuple
554 The comparison key.
556 Notes
557 -----
558 Tuples are compared lexicographically in Python: they are compared
559 element-wise until an unequal pair of elements is found.
561 """
562 group = event['group'].lower()
563 search = event['search'].lower()
564 try:
565 group_rank = app.conf['superevent_candidate_preference'][group].get(
566 search, -1
567 )
568 except KeyError:
569 group_rank = -1
570 if group == 'cbc':
571 n_ifos = len(get_instruments(event))
572 snr_or_far_ranking = get_snr(event)
573 else:
574 # We don't care about the number of detectors for burst events.
575 n_ifos = -1
576 # Smaller FAR -> higher IFAR -> more significant.
577 # Use -FAR instead of IFAR=1/FAR so that rank for FAR=0 is defined.
578 snr_or_far_ranking = -event['far']
580 # Conditions that determine choice of the preferred event
581 # event completeness comes first
582 # then, publishability criteria for significant events
583 # then, publishability criteria for less-significant events
584 # then, CBC group is given higher priority over Burst
585 # then, prioritize more number of detectors
586 # finally, use SNR (FAR) between two CBC (Burst) events
587 # See https://rtd.igwn.org/projects/gwcelery/en/latest/gwcelery.tasks.superevents.html#selection-of-the-preferred-event # noqa: E501
588 return (
589 is_complete(event),
590 *_should_publish(event, significant=True),
591 *_should_publish(event, significant=False),
592 group_rank,
593 n_ifos,
594 snr_or_far_ranking
595 )
598def _update_superevent(superevent_id, new_event_dict,
599 t_0, t_start, t_end):
600 """Update preferred event and/or change time window. Events with multiple
601 detectors take precedence over single-detector events, then CBC events take
602 precedence over burst events, and any remaining tie is broken by SNR/FAR
603 values for CBC/Burst. Single detector are not promoted to preferred event
604 status, if existing preferred event is multi-detector
606 Parameters
607 ----------
608 superevent_id : str
609 the superevent_id
610 new_event_dict : dict
611 event info of the new trigger as a dictionary
612 t_0 : float
613 center time of `superevent_id`, None for no change
614 t_start : float
615 start time of `superevent_id`, None for no change
616 t_end : float
617 end time of `superevent_id`, None for no change
619 """
620 # labels and preferred event in the IGWN alert are not the latest
621 superevent_dict = gracedb.get_superevent(superevent_id)
623 superevent_labels = superevent_dict['labels']
624 preferred_event_dict = superevent_dict['preferred_event_data']
625 kwargs = {}
626 if t_start is not None:
627 kwargs['t_start'] = t_start
628 if t_end is not None:
629 kwargs['t_end'] = t_end
630 if FROZEN_LABEL not in superevent_labels:
631 if keyfunc(new_event_dict) > keyfunc(preferred_event_dict):
632 # update preferred event when LOW_SIGNIF_LOCKED is not applied
633 kwargs['t_0'] = t_0
634 kwargs['preferred_event'] = new_event_dict['graceid']
636 if kwargs:
637 gracedb.update_superevent(superevent_id, **kwargs)
639 # completeness takes first precedence in deciding preferred event
640 # necessary and suffiecient condition to superevent as ready
641 if is_complete(new_event_dict):
642 gracedb.create_label.delay(READY_LABEL, superevent_id)
645def _superevent_segment_list(superevents):
646 """Ingests a list of superevent dictionaries, and returns a segmentlist
647 with start and end times as the duration of each segment.
649 Parameters
650 ----------
651 superevents : list
652 List of superevent dictionaries (e.g., the values
653 of field ``superevent_neighbours`` in igwn-alert packet).
655 Returns
656 -------
657 superevent_list : segmentlist
658 superevents as a segmentlist object
660 """
661 return segmentlist(
662 [_SuperEvent(s['t_start'], s['t_end'], s['t_0'], s['superevent_id'])
663 for s in superevents])
666def _partially_intersects(superevents, event_segment):
667 """Similar to :meth:`segmentlist.find` except it also returns the segment
668 of `superevents` which partially intersects argument. If there are more
669 than one intersections, first occurence is returned.
671 Parameters
672 ----------
673 superevents : list
674 list of superevents. Typical value of
675 ``superevent_neighbours.values()``.
676 event_segment : segment
677 segment object whose index is wanted
679 Returns
680 -------
681 match_segment : segment
682 segment in `self` which intersects. `None` if not found
684 """
685 # create a segmentlist using start and end times
686 superevents = _superevent_segment_list(superevents)
687 for s in superevents:
688 if s.intersects(event_segment):
689 return s
690 return None
693class _Event(segment):
694 """An event implemented as an extension of :class:`segment`."""
696 def __new__(cls, t_start, t_end, *args, **kwargs):
697 return super().__new__(cls, t_start, t_end)
699 def __init__(self, t_start, t_end, t_0, gid):
700 self.t_0 = t_0
701 self.gid = gid
704class _SuperEvent(segment):
705 """An superevent implemented as an extension of :class:`segment`."""
707 def __new__(cls, t_start, t_end, *args, **kwargs):
708 return super().__new__(cls, t_start, t_end)
710 def __init__(self, t_start, t_end, t_0, sid):
711 self.t_start = t_start
712 self.t_end = t_end
713 self.t_0 = t_0
714 self.superevent_id = sid