Coverage for gwcelery/tasks/orchestrator.py: 94%
395 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 18:01 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 18:01 +0000
1"""Tasks that comprise the alert orchestrator.
3The orchestrator is responsible for the vetting and annotation workflow to
4produce preliminary, initial, and update alerts for gravitational-wave event
5candidates.
6"""
7import json
8import re
10from astropy.time import Time
11from celery import chain, group
12from ligo.rrt_chat import channel_creation
13from rapidpe_rift_pipe import pastro as rpe_pastro
15from .. import app
16from . import (alerts, bayestar, circulars, detchar, em_bright,
17 external_skymaps, gcn, gracedb, igwn_alert, inference, p_astro,
18 rrt_utils, skymaps, superevents)
19from .core import get_first, identity
22@igwn_alert.handler('superevent',
23 'mdc_superevent',
24 shared=False)
25def handle_superevent(alert):
26 """Schedule annotations for new superevents.
28 After waiting for a time specified by the
29 :obj:`~gwcelery.conf.orchestrator_timeout` configuration variable for the
30 choice of preferred event to settle down, this task performs data quality
31 checks with :meth:`gwcelery.tasks.detchar.check_vectors` and calls
32 :meth:`~gwcelery.tasks.orchestrator.earlywarning_preliminary_alert` to send
33 a preliminary notice.
34 """
35 superevent_id = alert['uid']
36 # launch PE and detchar based on new type superevents
37 if alert['alert_type'] == 'new':
38 # launching rapidpe 30s after merger.
39 timeout = max(
40 alert['object']['t_0'] - Time.now().gps +
41 app.conf['rapidpe_timeout'], 0
42 )
43 (
44 _get_preferred_event.si(superevent_id).set(
45 countdown=timeout
46 )
47 |
48 group(
49 _get_cbc_lowest_far.si(superevent_id),
50 gracedb.get_event.s()
51 )
52 |
53 parameter_estimation.s(superevent_id, 'rapidpe')
54 ).apply_async()
56 (
57 _get_preferred_event.si(superevent_id).set(
58 countdown=app.conf['pe_timeout']
59 )
60 |
61 group(
62 _get_cbc_lowest_far.si(superevent_id),
63 gracedb.get_event.s()
64 )
65 |
66 parameter_estimation.s(superevent_id, 'bilby')
67 ).apply_async()
69 # run check_vectors. Create and upload omegascans
70 group(
71 detchar.omegascan.si(alert['object']['t_0'], superevent_id),
73 detchar.check_vectors.si(
74 alert['object']['preferred_event_data'],
75 superevent_id,
76 alert['object']['t_start'],
77 alert['object']['t_end']
78 )
79 ).delay()
81 elif alert['alert_type'] == 'label_added':
82 label_name = alert['data']['name']
83 query = f'superevent: {superevent_id} group: CBC Burst'
84 if alert['object']['category'] == 'MDC':
85 query += ' MDC'
86 elif alert['object']['category'] == 'Test':
87 query += ' Test'
89 # launch less-significant preliminary alerts on LOW_SIGNIF_LOCKED
90 if label_name == superevents.FROZEN_LABEL:
91 # don't launch if EARLY_WARNING or SIGNIF_LOCKED is present
92 skipping_labels = {
93 superevents.SIGNIFICANT_LABEL,
94 superevents.EARLY_WARNING_LABEL
95 }.intersection(alert['object']['labels'])
96 if skipping_labels:
97 gracedb.upload.delay(
98 None, None, superevent_id,
99 "The superevent already has a significant/EW event, "
100 "skipping launching less-significant alert"
101 )
102 return
103 (
104 gracedb.upload.s(
105 None,
106 None,
107 superevent_id,
108 "Automated DQ check before sending less-significant "
109 "preliminary alert. New results supersede old results.",
110 tags=['data_quality']
111 )
112 |
113 detchar.check_vectors.si(
114 alert['object']['preferred_event_data'],
115 superevent_id,
116 alert['object']['t_start'],
117 alert['object']['t_end']
118 )
119 |
120 earlywarning_preliminary_alert.s(
121 alert, alert_type='less-significant')
122 ).apply_async()
124 # launch significant alert on SIGNIF_LOCKED
125 elif label_name == superevents.SIGNIFICANT_LABEL:
126 # ensure superevent is locked before starting alert workflow
127 r = chain()
128 if superevents.FROZEN_LABEL not in alert['object']['labels']:
129 r |= gracedb.create_label.si(superevents.FROZEN_LABEL,
130 superevent_id)
132 r |= (
133 gracedb.get_events.si(query)
134 |
135 superevents.select_preferred_event.s()
136 |
137 _update_superevent_and_return_event_dict.s(superevent_id)
138 |
139 _leave_log_message_and_return_event_dict.s(
140 superevent_id,
141 "Superevent cleaned up after significant event. "
142 )
143 |
144 _leave_log_message_and_return_event_dict.s(
145 superevent_id,
146 "Automated DQ check before sending significant alert. "
147 "New results supersede old results.",
148 tags=['data_quality']
149 )
150 |
151 detchar.check_vectors.s(
152 superevent_id,
153 alert['object']['t_start'],
154 alert['object']['t_end']
155 )
156 |
157 earlywarning_preliminary_alert.s(
158 alert, alert_type='preliminary')
159 )
160 r.apply_async()
162 # launch second preliminary on GCN_PRELIM_SENT
163 elif label_name == 'GCN_PRELIM_SENT':
164 (
165 identity.si().set(
166 # https://git.ligo.org/emfollow/gwcelery/-/issues/478
167 # FIXME: remove this task once https://github.com/celery/celery/issues/7851 is resolved # noqa: E501
168 countdown=app.conf['superevent_clean_up_timeout']
169 )
170 |
171 gracedb.get_events.si(query)
172 |
173 superevents.select_preferred_event.s()
174 |
175 _update_superevent_and_return_event_dict.s(superevent_id)
176 |
177 group(
178 _leave_log_message_and_return_event_dict.s(
179 superevent_id,
180 "Superevent cleaned up after first preliminary alert"
181 ),
183 gracedb.create_label.si('DQR_REQUEST', superevent_id)
184 )
185 |
186 get_first.s()
187 |
188 earlywarning_preliminary_alert.s(
189 alert, alert_type='preliminary')
190 ).apply_async()
192 # set pipeline preferred events
193 # FIXME: Ideally this should combined with the previous canvas.
194 # However, incorporating that group prevents canvas from executing
195 # maybe related to https://github.com/celery/celery/issues/7851
196 (
197 gracedb.get_events.si(query)
198 |
199 superevents.select_pipeline_preferred_event.s()
200 |
201 _set_pipeline_preferred_events.s(superevent_id)
202 ).apply_async(countdown=app.conf['superevent_clean_up_timeout'])
204 elif label_name == 'LOW_SIGNIF_PRELIM_SENT':
205 # similar workflow as the GCN_PRELIM_SENT
206 # except block by condition evaluated at the end of timeout
207 _revise_and_send_second_less_significant_alert.si(
208 alert, query, superevent_id,
209 ).apply_async(countdown=app.conf['superevent_clean_up_timeout'])
211 elif label_name == superevents.EARLY_WARNING_LABEL:
212 if superevents.SIGNIFICANT_LABEL in alert['object']['labels']:
213 # stop if full BW significant event already present
214 gracedb.upload.delay(
215 None, None, superevent_id,
216 "Superevent superseded by full BW event, skipping EW."
217 )
218 return
219 # start the EW alert pipeline; is blocked by SIGNIF_LOCKED
220 # ensure superevent is locked before starting pipeline
221 r = chain()
222 if superevents.FROZEN_LABEL not in alert['object']['labels']:
223 r |= gracedb.create_label.si(superevents.FROZEN_LABEL,
224 superevent_id)
226 r |= (
227 gracedb.get_events.si(query)
228 |
229 superevents.select_preferred_event.s()
230 |
231 _update_superevent_and_return_event_dict.s(superevent_id)
232 |
233 _leave_log_message_and_return_event_dict.s(
234 superevent_id,
235 "Superevent cleaned up before sending EW alert."
236 )
237 |
238 earlywarning_preliminary_alert.s(
239 alert, alert_type='earlywarning')
240 )
241 r.apply_async()
243 # launch initial/retraction alert on ADVOK/ADVNO
244 elif label_name == 'ADVOK':
245 initial_alert((None, None, None), alert)
246 elif label_name == 'ADVNO':
247 retraction_alert(alert)
248 elif label_name == 'ADVREQ':
249 if app.conf['create_mattermost_channel']:
250 _create_mattermost_channel.si(superevent_id).delay()
252 # check DQV label on superevent, run check_vectors if required
253 elif alert['alert_type'] == 'event_added':
254 # FIXME Check if this should be changed to 'update' alert_types instead
255 # of 'event_added'. 'event_added' seems like it's just a new event in
256 # the superevent window, not necessarily an event that should be
257 # promoted to preferred event
258 start = alert['data']['t_start']
259 end = alert['data']['t_end']
261 if 'DQV' in gracedb.get_labels(superevent_id):
262 (
263 detchar.check_vectors.s(
264 alert['object']['preferred_event_data'],
265 superevent_id,
266 start,
267 end
268 )
269 |
270 _update_if_dqok.s(superevent_id)
271 ).apply_async()
274@igwn_alert.handler('cbc_gstlal',
275 'cbc_spiir',
276 'cbc_pycbc',
277 'cbc_mbta',
278 shared=False)
279def handle_cbc_event(alert):
280 """Perform annotations for CBC events that depend on pipeline-specific
281 matched-filter parameter estimates.
283 Notes
284 -----
285 This IGWN alert message handler is triggered by a new upload or by updates
286 that include the file ``pipeline.p_astro.json``. If also generates
287 pipeline.p_astro.json information for pipelines that do not provide
288 such information.
290 The table below lists which files are created as a result of a new upload,
291 and which tasks generate them.
293 ============================== ==================================================
294 File Task
295 ============================== ==================================================
296 ``bayestar.multiorder.fits`` :meth:`gwcelery.tasks.bayestar.localize`
297 ``em_bright.json`` :meth:`gwcelery.tasks.em_bright.source_properties`
298 ``pipeline.p_astro.json`` :meth:`gwcelery.tasks.p_astro.compute_p_astro`
299 ============================== ==================================================
301 """ # noqa: E501
302 graceid = alert['uid']
303 pipeline = alert['object']['pipeline'].lower()
304 search = alert['object']['search'].lower()
306 # no annotations for events used in VT analysis
307 if search == superevents.VT_SEARCH_NAME.lower():
308 return
310 priority = 0 if superevents.should_publish(alert['object']) else 1
312 # Pipelines that use the GWCelery p-astro method
313 # - spiir (all searches)
314 # - pycbc for EarlyWarning search
315 # - periodic MDC generated by first-two-years (based on gstlal)
316 # FIXME: Remove this once all pipelines compute their own p-astro
317 pipelines_stock_p_astro = {('spiir', 'earlywarning'),
318 ('pycbc', 'earlywarning'),
319 ('gstlal', 'mdc')}
321 # em_bright and p_astro calculation
322 if alert['alert_type'] == 'new':
323 instruments = superevents.get_instruments_in_ranking_statistic(
324 alert['object'])
325 extra_attributes = alert['object']['extra_attributes']
326 snr = superevents.get_snr(alert['object'])
327 far = alert['object']['far']
328 mass1 = extra_attributes['SingleInspiral'][0]['mass1']
329 mass2 = extra_attributes['SingleInspiral'][0]['mass2']
330 chi1 = extra_attributes['SingleInspiral'][0]['spin1z']
331 chi2 = extra_attributes['SingleInspiral'][0]['spin2z']
333 # FIXME: remove conditional when em-bright implementation exists
334 searches_without_em_bright = [superevents.SUBSOLAR_SEARCH_NAME.lower()]
335 if search not in searches_without_em_bright:
336 (
337 em_bright.source_properties.si(mass1, mass2, chi1, chi2, snr)
338 |
339 gracedb.upload.s(
340 'em_bright.json', graceid,
341 'em bright complete', ['em_bright', 'public']
342 )
343 |
344 gracedb.create_label.si('EMBRIGHT_READY', graceid)
345 ).apply_async(priority=priority)
347 # p_astro calculation for pipelines that does not provide a
348 # stock p_astro (upload pipeline.p_astro.json)
349 if (pipeline, search) in pipelines_stock_p_astro:
350 (
351 p_astro.compute_p_astro.s(snr,
352 far,
353 mass1,
354 mass2,
355 pipeline,
356 instruments)
357 |
358 gracedb.upload.s(
359 f'{pipeline}.p_astro.json', graceid,
360 'p_astro computation complete', ['p_astro', 'public']
361 )
362 |
363 gracedb.create_label.si('PASTRO_READY', graceid)
364 ).apply_async(priority=priority)
366 # Start BAYESTAR for all CBC pipelines.
367 (
368 gracedb.download.s('coinc.xml', graceid)
369 |
370 bayestar.localize.s(graceid)
371 |
372 gracedb.upload.s(
373 'bayestar.multiorder.fits', graceid,
374 'sky localization complete', ['sky_loc', 'public']
375 )
376 |
377 gracedb.create_label.si('SKYMAP_READY', graceid)
378 ).apply_async(priority=priority)
381@igwn_alert.handler('burst_olib',
382 'burst_cwb',
383 'burst_mly',
384 shared=False)
385def handle_burst_event(alert):
386 """Perform annotations for burst events that depend on pipeline-specific
387 """ # noqa: E501
388 graceid = alert['uid']
389 pipeline = alert['object']['pipeline'].lower()
390 search = alert['object']['search'].lower()
391 priority = 0 if superevents.should_publish(alert['object']) else 1
393 # em_bright calculation for Burst-cWB-BBH
394 if alert['alert_type'] == 'new':
395 if (pipeline, search) in [('cwb', 'bbh')]:
396 extra_attributes = alert['object']['extra_attributes']
397 multiburst = extra_attributes['MultiBurst']
398 snr = multiburst.get('snr')
399 mchirp = multiburst.get('mchirp', 0.0)
400 # FIXME once ingestion of mchip is finalised
401 # In case mchirp is not there or zero
402 # produce em_brigth with all zero
403 if mchirp == 0.0:
404 m12 = 30.0
405 else:
406 m12 = 2**(0.2) * mchirp
407 (
408 em_bright.source_properties.si(m12, m12, 0.0, 0.0, snr)
409 |
410 gracedb.upload.s(
411 'em_bright.json', graceid,
412 'em bright complete', ['em_bright', 'public']
413 )
414 |
415 gracedb.create_label.si('EMBRIGHT_READY', graceid)
416 ).apply_async(priority=priority)
418 if alert['alert_type'] != 'log':
419 return
421 filename = alert['data']['filename']
423 # Pipeline is uploading a flat resultion skymap file
424 # Converting to a multiirder ones with proper name.
425 # FIXME: Remove block when CWB starts to upload skymaps
426 # in multiorder format
427 if filename.endswith('.fits.gz'):
428 new_filename = filename.replace('.fits.gz', '.multiorder.fits')
429 flatten_msg = (
430 'Multi-resolution FITS file created from '
431 '<a href="/api/events/{graceid}/files/'
432 '{filename}">{filename}</a>').format(
433 graceid=graceid, filename=filename)
434 tags = ['sky_loc', 'lvem', 'public']
435 (
436 gracedb.download.si(filename, graceid)
437 |
438 skymaps.unflatten.s(new_filename)
439 |
440 gracedb.upload.s(
441 new_filename, graceid, flatten_msg, tags)
442 |
443 gracedb.create_label.si('SKYMAP_READY', graceid)
444 ).apply_async(priority=priority)
447@igwn_alert.handler('superevent',
448 'mdc_superevent',
449 shared=False)
450def handle_posterior_samples(alert):
451 """Generate multi-resolution and flat-resolution FITS files and skymaps
452 from an uploaded HDF5 file containing posterior samples.
453 """
454 if alert['alert_type'] != 'log' or \
455 not alert['data']['filename'].endswith('.posterior_samples.hdf5'):
456 return
457 superevent_id = alert['uid']
458 filename = alert['data']['filename']
459 info = '{} {}'.format(alert['data']['comment'], filename)
460 prefix, _ = filename.rsplit('.posterior_samples.')
461 skymap_filename = f'{prefix}.multiorder.fits'
462 labels = ['pe', 'sky_loc']
463 ifos = superevents.get_instruments(alert['object']['preferred_event_data'])
465 (
466 gracedb.download.si(filename, superevent_id)
467 |
468 skymaps.skymap_from_samples.s(superevent_id, ifos)
469 |
470 group(
471 skymaps.annotate_fits.s(
472 skymap_filename, superevent_id, labels
473 ),
475 gracedb.upload.s(
476 skymap_filename, superevent_id,
477 'Multiresolution FITS file generated from "{}"'.format(info),
478 labels
479 )
480 )
481 ).delay()
483 # em_bright from LALInference posterior samples
484 (
485 gracedb.download.si(filename, superevent_id)
486 |
487 em_bright.em_bright_posterior_samples.s()
488 |
489 gracedb.upload.s(
490 '{}.em_bright.json'.format(prefix), superevent_id,
491 'em-bright computed from "{}"'.format(info),
492 'pe'
493 )
494 ).delay()
497@app.task(bind=True, shared=False)
498def _create_mattermost_channel(self, superevent_id):
499 """
500 Creates a mattermost channel when ADVREQ label is applied and
501 posts a cooresponding gracedb link of that event in the channel
503 Channel name : O4 RRT {superevent_id}
505 Parameters:
506 ------------
507 superevent_id: str
508 The superevent id
509 """
510 gracedb_url = self.app.conf['gracedb_host']
511 channel_creation.rrt_channel_creation(
512 superevent_id, gracedb_url)
515@app.task(shared=False)
516def _set_pipeline_preferred_events(pipeline_event, superevent_id):
517 """Return group for setting pipeline preferred event using
518 :meth:`gracedb.add_pipeline_preferred_event`.
520 Parameters
521 ----------
522 pipeline_event: dict
523 {pipeline: event_dict} key value pairs, returned by
524 :meth:`superevents.select_pipeline_preferred_event`.
526 superevent_id: str
527 The superevent id
528 """
529 return group(
530 gracedb.add_pipeline_preferred_event(superevent_id,
531 event['graceid'])
532 for event in pipeline_event.values()
533 )
536@app.task(shared=False, ignore_result=True)
537def _update_if_dqok(event, superevent_id):
538 """Update `preferred_event` of `superevent_id` to `event_id` if `DQOK`
539 label has been applied.
540 """
541 if 'DQOK' in gracedb.get_labels(superevent_id):
542 event_id = event['graceid']
543 gracedb.update_superevent(superevent_id,
544 preferred_event=event_id,
545 t_0=event["gpstime"])
546 gracedb.upload.delay(
547 None, None, superevent_id,
548 comment=f'DQOK applied based on new event {event_id}')
551@gracedb.task(shared=False)
552def _get_preferred_event(superevent_id):
553 """Determine preferred event for a superevent by querying GraceDB.
555 This works just like :func:`gwcelery.tasks.gracedb.get_superevent`, except
556 that it returns only the preferred event, and not the entire GraceDB JSON
557 response.
558 """
559 # FIXME: remove ._orig_run when this bug is fixed:
560 # https://github.com/getsentry/sentry-python/issues/370
561 return gracedb.get_superevent._orig_run(superevent_id)['preferred_event']
564@gracedb.task(shared=False)
565def _create_voevent(classification, *args, **kwargs):
566 r"""Create a VOEvent record from an EM bright JSON file.
568 Parameters
569 ----------
570 classification : tuple, None
571 A collection of JSON strings, generated by
572 :meth:`gwcelery.tasks.em_bright.source_properties` and
573 :meth:`gwcelery.tasks.p_astro.compute_p_astro` or
574 content of ``{gstlal,mbta}.p_astro.json`` uploaded
575 by {gstlal,mbta} respectively; or None
576 \*args
577 Additional positional arguments passed to
578 :meth:`gwcelery.tasks.gracedb.create_voevent`.
579 \*\*kwargs
580 Additional keyword arguments passed to
581 :meth:`gwcelery.tasks.gracedb.create_voevent`.
583 Returns
584 -------
585 str
586 The filename of the newly created VOEvent.
588 """
589 kwargs = dict(kwargs)
591 if classification is not None:
592 # Merge source classification and source properties into kwargs.
593 for text in classification:
594 # Ignore filenames, only load dict in bytes form
595 if text is not None:
596 kwargs.update(json.loads(text))
598 # FIXME: These keys have differ between em_bright.json
599 # and the GraceDB REST API.
600 try:
601 kwargs['ProbHasNS'] = kwargs.pop('HasNS')
602 except KeyError:
603 pass
605 try:
606 kwargs['ProbHasRemnant'] = kwargs.pop('HasRemnant')
607 except KeyError:
608 pass
610 skymap_filename = kwargs.get('skymap_filename')
611 if skymap_filename is not None:
612 skymap_type = re.sub(
613 r'(\.multiorder)?\.fits(\..+)?(,[0-9]+)?$', '', skymap_filename)
614 kwargs.setdefault('skymap_type', skymap_type)
616 # FIXME: remove ._orig_run when this bug is fixed:
617 # https://github.com/getsentry/sentry-python/issues/370
618 return gracedb.create_voevent._orig_run(*args, **kwargs)
621@app.task(shared=False)
622def _create_label_and_return_filename(filename, label, graceid):
623 gracedb.create_label.delay(label, graceid)
624 return filename
627@app.task(shared=False)
628def _leave_log_message_and_return_event_dict(event, superevent_id,
629 message, **kwargs):
630 """Wrapper around :meth:`gracedb.upload`
631 that returns the event dictionary.
632 """
633 gracedb.upload.delay(None, None, superevent_id, message, **kwargs)
634 return event
637@gracedb.task(shared=False)
638def _update_superevent_and_return_event_dict(event, superevent_id):
639 """Wrapper around :meth:`gracedb.update_superevent`
640 that returns the event dictionary.
641 """
642 gracedb.update_superevent(superevent_id,
643 preferred_event=event['graceid'],
644 t_0=event['gpstime'])
645 return event
648@gracedb.task(shared=False)
649def _proceed_if_not_blocked_by(files, superevent_id, block_by):
650 """Return files in case the superevent does not have labels `block_by`
652 Parameters
653 ----------
654 files : tuple
655 List of files
656 superevent_id : str
657 The superevent id corresponding to files
658 block_by : set
659 Set of blocking labels. E.g. `{'ADVOK', 'ADVNO'}`
660 """
661 superevent_labels = gracedb.get_labels(superevent_id)
662 blocking_labels = block_by.intersection(superevent_labels)
663 if blocking_labels:
664 gracedb.upload.delay(
665 None, None, superevent_id,
666 f"Blocking automated notice due to labels {blocking_labels}"
667 )
668 return None
669 else:
670 return files
673@gracedb.task(shared=False)
674def _revise_and_send_second_less_significant_alert(alert, query,
675 superevent_id):
676 superevent_labels = gracedb.get_labels(superevent_id)
677 blocking_labels = {
678 'ADVREQ', 'ADVOK', 'ADVNO',
679 superevents.SIGNIFICANT_LABEL,
680 superevents.EARLY_WARNING_LABEL,
681 }
682 if blocking_labels.intersection(superevent_labels):
683 return
685 (
686 gracedb.get_events.si(query)
687 |
688 superevents.select_preferred_event.s()
689 |
690 _update_superevent_and_return_event_dict.s(superevent_id)
691 |
692 _leave_log_message_and_return_event_dict.s(
693 superevent_id,
694 "Superevent cleaned up before second less-significant alert"
695 )
696 |
697 earlywarning_preliminary_alert.s(
698 alert, alert_type='less-significant')
699 ).delay()
701 # set pipeline preferred events
702 # FIXME: Ideally this should combined with the previous canvas.
703 # However, incorporating that group prevents canvas from executing
704 # maybe related to https://github.com/celery/celery/issues/7851
705 (
706 gracedb.get_events.si(query)
707 |
708 superevents.select_pipeline_preferred_event.s()
709 |
710 _set_pipeline_preferred_events.s(superevent_id)
711 ).delay()
714@app.task(shared=False)
715def _annotate_fits_and_return_input(input_list, superevent_id):
716 """Unpack the output of the skymap, embright, p-astro download group in the
717 beginning of the
718 :meth:`~gwcelery.tasks.orchestartor.earlywarning_preliminary_alert` canvas
719 and call :meth:`~gwcelery.tasks.skymaps.annotate_fits`.
722 Parameters
723 ----------
724 input_list : list
725 The output of the group that downloads the skymap, embright, and
726 p-astro files. This list is in the form [skymap, skymap_filename],
727 [em_bright, em_bright_filename], [p_astro_dict, p_astro_filename],
728 though the em-bright and p-astro lists can be populated by Nones
729 superevent_id : str
730 A list of the sky map, em_bright, and p_astro filenames.
731 """
733 skymaps.annotate_fits_tuple(
734 input_list[0],
735 superevent_id,
736 ['sky_loc', 'public']
737 )
739 return input_list
742@gracedb.task(shared=False)
743def _unpack_args_and_send_earlywarning_preliminary_alert(input_list, alert,
744 alert_type):
745 """Unpack the output of the skymap, embright, p-astro download group in the
746 beginning of the
747 :meth:`~gwcelery.tasks.orchestartor.earlywarning_preliminary_alert` canvas
748 and call
749 :meth:`gwcelery.tasks.orchestrator.earlywarning_preliminary_initial_update_alert`.
752 Parameters
753 ----------
754 input_list : list
755 The output of the group that downloads the skymap, embright, and
756 p-astro files. This list is in the form [skymap, skymap_filename],
757 [em_bright, em_bright_filename], [p_astro_dict, p_astro_filename],
758 though the em-bright and p-astro lists can be populated by Nones
759 alert : dict
760 IGWN-Alert dictionary
761 alert_type : str
762 alert_type passed to
763 :meth:`earlywarning_preliminary_initial_update_alert`
764 """
765 if input_list is None: # alert is blocked by blocking labels
766 return
768 [skymap, skymap_filename], [em_bright, em_bright_filename], \
769 [p_astro_dict, p_astro_filename] = input_list
771 # Update to latest state after downloading files
772 superevent = gracedb.get_superevent(alert['object']['superevent_id'])
774 earlywarning_preliminary_initial_update_alert.delay(
775 [skymap_filename, em_bright_filename, p_astro_filename],
776 superevent, alert_type,
777 filecontents=[skymap, em_bright, p_astro_dict]
778 )
781@app.task(ignore_result=True, shared=False)
782def earlywarning_preliminary_alert(event, alert, alert_type='preliminary',
783 initiate_voevent=True):
784 """Produce a preliminary alert by copying any sky maps.
786 This consists of the following steps:
788 1. Copy any sky maps and source classification from the preferred event
789 to the superevent.
790 2. Create standard annotations for sky maps including all-sky plots by
791 calling :meth:`gwcelery.tasks.skymaps.annotate_fits`.
792 3. Create a preliminary VOEvent.
793 4. Send the VOEvent to GCN and notices to SCiMMA and GCN.
794 5. Apply the GCN_PRELIM_SENT or LOW_SIGNIF_PRELIM_SENT
795 depending on the significant or less-significant alert
796 respectively.
797 6. Create and upload a GCN Circular draft.
798 """
799 priority = 0 if superevents.should_publish(event) else 1
800 preferred_event_id = event['graceid']
801 superevent_id = alert['uid']
803 # Define alert payloads depending on group-pipeline-search
804 # cbc-*-* : p_astro/em_bright
805 # burst.cwb-bbh : p_astro/em_bright
806 # burst-*-* : NO p_astro/em_bright
807 alert_group = event['group'].lower()
808 alert_pipeline = event['pipeline'].lower()
809 alert_search = event['search'].lower()
810 if alert_pipeline == 'cwb' and alert_search == 'bbh':
811 skymap_filename = alert_pipeline + '.multiorder.fits'
812 p_astro_filename = alert_pipeline + '.p_astro.json'
813 em_bright_filename = 'em_bright.json'
814 elif alert_group == 'cbc':
815 skymap_filename = 'bayestar.multiorder.fits'
816 p_astro_filename = alert_pipeline + '.p_astro.json'
817 em_bright_filename = 'em_bright.json'
818 elif alert_group == 'burst':
819 skymap_filename = event['pipeline'].lower() + '.multiorder.fits'
820 p_astro_filename = None
821 em_bright_filename = None
822 else:
823 raise NotImplementedError(
824 'Valid skymap required for preliminary alert'
825 )
827 # Determine if the event should be made public.
828 is_publishable = (superevents.should_publish(
829 event, significant=alert_type != 'less-significant') and
830 {'DQV', 'INJ'}.isdisjoint(
831 event['labels']))
833 # Download files from events and upload to superevent with relevant
834 # annotations. Pass file contents down the chain so alerts task doesn't
835 # need to download them again.
836 # Note: this is explicitly made a chain to fix an issue described in #464.
837 canvas = chain(
838 group(
839 gracedb.download.si(skymap_filename, preferred_event_id)
840 |
841 group(
842 identity.s(),
844 gracedb.upload.s(
845 skymap_filename,
846 superevent_id,
847 message='Localization copied from {}'.format(
848 preferred_event_id),
849 tags=['sky_loc', 'public']
850 )
851 |
852 _create_label_and_return_filename.s('SKYMAP_READY',
853 superevent_id)
854 ),
856 (
857 gracedb.download.si(em_bright_filename, preferred_event_id)
858 |
859 group(
860 identity.s(),
862 gracedb.upload.s(
863 em_bright_filename,
864 superevent_id,
865 message='Source properties copied from {}'.format(
866 preferred_event_id),
867 tags=['em_bright', 'public']
868 )
869 |
870 _create_label_and_return_filename.s('EMBRIGHT_READY',
871 superevent_id)
872 )
873 ) if em_bright_filename is not None else
874 identity.s([None, None]),
876 (
877 gracedb.download.si(p_astro_filename, preferred_event_id)
878 |
879 group(
880 identity.s(),
882 gracedb.upload.s(
883 p_astro_filename,
884 superevent_id,
885 message='Source classification copied from {}'.format(
886 preferred_event_id),
887 tags=['p_astro', 'public']
888 )
889 |
890 _create_label_and_return_filename.s('PASTRO_READY',
891 superevent_id)
892 )
893 ) if p_astro_filename is not None else
894 identity.s([None, None])
895 )
896 |
897 # Need annotate skymap task in body of chord instead of header because
898 # this task simply calls another task, which is to be avoided in chord
899 # headers. Note that any group that chains to a task is automatically
900 # upgraded to a chord.
901 _annotate_fits_and_return_input.s(superevent_id)
902 )
904 # Switch for disabling all but MDC alerts.
905 if app.conf['only_alert_for_mdc']:
906 if event.get('search') != 'MDC':
907 canvas |= gracedb.upload.si(
908 None, None, superevent_id,
909 ("Skipping alert because gwcelery has been configured to only"
910 " send alerts for MDC events."))
911 canvas.apply_async(priority=priority)
912 return
914 # Send notice and upload GCN circular draft for online events.
915 if is_publishable and initiate_voevent:
916 # presence of advocate action blocks significant prelim alert
917 # presence of adv action or significant event blocks EW alert
918 # presence of adv action or significant event or EW event blocks
919 # less significant alert
920 blocking_labels = (
921 {'ADVOK', 'ADVNO'} if alert_type == 'preliminary'
922 else
923 {superevents.SIGNIFICANT_LABEL, 'ADVOK', 'ADVNO'}
924 if alert_type == 'earlywarning'
925 else
926 {superevents.EARLY_WARNING_LABEL, superevents.SIGNIFICANT_LABEL,
927 'ADVOK', 'ADVNO'}
928 if alert_type == 'less-significant'
929 else
930 set()
931 )
932 canvas |= (
933 _proceed_if_not_blocked_by.s(superevent_id, blocking_labels)
934 |
935 _unpack_args_and_send_earlywarning_preliminary_alert.s(
936 alert, alert_type
937 )
938 )
940 canvas.apply_async(priority=priority)
943@gracedb.task(shared=False)
944def _get_cbc_lowest_far(superevent_id):
945 """Obtain the lowest FAR of the CBC events in the target superevent."""
946 # FIXME: remove ._orig_run when this bug is fixed:
947 # https://github.com/getsentry/sentry-python/issues/370
948 events = [
949 gracedb.get_event._orig_run(gid) for gid in
950 gracedb.get_superevent._orig_run(superevent_id)["gw_events"]
951 ]
952 return min(
953 [e['far'] for e in events if e['group'].lower() == 'cbc'],
954 default=None
955 )
958@app.task(ignore_result=True, shared=False)
959def parameter_estimation(far_event, superevent_id, pe_pipeline):
960 """Parameter Estimation with Bilby and RapidPE-RIFT. Parameter estimation
961 runs are triggered for CBC triggers which pass the FAR threshold and are
962 not mock uploads. For those which do not pass these criteria, this task
963 uploads messages explaining why parameter estimation is not started.
964 """
965 far, event = far_event
966 group = event['group'].lower()
967 search = event['search'].lower()
968 if search in app.conf['significant_alert_far_threshold']['cbc']:
969 threshold = (
970 app.conf['significant_alert_far_threshold']['cbc'][search] /
971 app.conf['significant_alert_trials_factor']['cbc'][search]
972 )
973 else:
974 # Fallback in case an event is uploaded to an unlisted search
975 threshold = -1 * float('inf')
976 if group != 'cbc':
977 gracedb.upload.delay(
978 filecontents=None, filename=None,
979 graceid=superevent_id,
980 message='Parameter estimation will not start since this is not '
981 'CBC but {}.'.format(event['group']),
982 tags='pe'
983 )
984 elif far > threshold:
985 gracedb.upload.delay(
986 filecontents=None, filename=None,
987 graceid=superevent_id,
988 message='Parameter estimation will not start since FAR is larger '
989 'than the PE threshold, {} Hz.'.format(threshold),
990 tags='pe'
991 )
992 elif search == 'mdc':
993 gracedb.upload.delay(
994 filecontents=None, filename=None,
995 graceid=superevent_id,
996 message='Parameter estimation will not start since parameter '
997 'estimation is disabled for mock uploads.',
998 tags='pe'
999 )
1000 elif event.get('offline', False):
1001 gracedb.upload.delay(
1002 filecontents=None, filename=None,
1003 graceid=superevent_id,
1004 message='Parameter estimation will not start since parameter '
1005 'estimation is disabled for OFFLINE events.',
1006 tags='pe'
1007 )
1008 elif (
1009 app.conf['gracedb_host'] == 'gracedb-playground.ligo.org'
1010 and event['pipeline'] == 'MBTA'
1011 ):
1012 # FIXME: Remove this block once multiple channels can be handled on
1013 # one gracedb instance
1014 gracedb.upload.delay(
1015 filecontents=None, filename=None,
1016 graceid=superevent_id,
1017 message='Parameter estimation is disabled for MBTA triggers '
1018 'on playground as MBTA analyses live data + online '
1019 'injections not O3 replay data + MDC injections',
1020 tags='pe'
1021 )
1022 elif (
1023 pe_pipeline == 'rapidpe' and
1024 event['search'].lower() == 'earlywarning'
1025 ):
1026 # Remove this if rapidpe can ingest early warning events
1027 gracedb.upload.delay(
1028 filecontents=None, filename=None,
1029 graceid=superevent_id,
1030 message='Parameter estimation by RapidPE-RIFT is disabled for '
1031 'earlywarning triggers.',
1032 tags='pe'
1033 )
1034 else:
1035 inference.start_pe.delay(event, superevent_id, pe_pipeline)
1038@gracedb.task(shared=False)
1039def earlywarning_preliminary_initial_update_alert(
1040 filenames,
1041 superevent,
1042 alert_type,
1043 filecontents=None
1044):
1045 """
1046 Create a canvas that sends an earlywarning, preliminary, initial, or update
1047 notice.
1049 Parameters
1050 ----------
1051 filenames : tuple
1052 A list of the sky map, em_bright, and p_astro filenames.
1053 superevent : dict
1054 The superevent dictionary, typically obtained from an IGWN Alert or
1055 from querying GraceDB.
1056 alert_type : {'less-significant', 'earlywarning', 'preliminary', 'initial', 'update'} # noqa: E501
1057 The alert type.
1059 Notes
1060 -----
1061 Tasks that call this function should be decorated with
1062 :obj:`gwcelery.tasks.gracedb.task` rather than :obj:`gwcelery.app.task` so
1063 that a synchronous call to :func:`gwcelery.tasks.gracedb.get_log` is
1064 retried in the event of GraceDB API failures. If `EM_COINC` is in labels
1065 will create a RAVEN circular.
1067 """
1068 labels = superevent['labels']
1069 superevent_id = superevent['superevent_id']
1071 if 'INJ' in labels:
1072 return
1074 if filecontents:
1075 assert alert_type in {
1076 'less-significant', 'earlywarning', 'preliminary'}
1078 skymap_filename, em_bright_filename, p_astro_filename = filenames
1079 rapidpe_pastro_filename = None
1080 rapidpe_pastro_needed = True
1081 combined_skymap_filename = None
1082 combined_skymap_needed = False
1083 skymap_needed = (skymap_filename is None)
1084 em_bright_needed = (em_bright_filename is None)
1085 p_astro_needed = (p_astro_filename is None)
1086 raven_coinc = ('RAVEN_ALERT' in labels and bool(superevent['em_type']))
1087 if raven_coinc:
1088 ext_labels = gracedb.get_labels(superevent['em_type'])
1089 combined_skymap_needed = \
1090 {"RAVEN_ALERT", "COMBINEDSKYMAP_READY"}.issubset(set(ext_labels))
1092 # FIXME: This if statement is always True, we should get rid of it.
1093 if skymap_needed or em_bright_needed or p_astro_needed or \
1094 combined_skymap_needed or rapidpe_pastro_needed:
1095 for message in gracedb.get_log(superevent_id):
1096 t = message['tag_names']
1097 f = message['filename']
1098 v = message['file_version']
1099 fv = '{},{}'.format(f, v)
1100 if not f:
1101 continue
1102 if skymap_needed \
1103 and {'sky_loc', 'public'}.issubset(t) \
1104 and f.endswith('.multiorder.fits') \
1105 and 'combined' not in f:
1106 skymap_filename = fv
1107 if em_bright_needed \
1108 and 'em_bright' in t \
1109 and f.endswith('.json'):
1110 em_bright_filename = fv
1111 if p_astro_needed \
1112 and {'public'}.issubset(t) \
1113 and 'p_astro' in t \
1114 and f.endswith('.json'):
1115 p_astro_filename = fv
1116 if combined_skymap_needed \
1117 and {'sky_loc', 'ext_coinc'}.issubset(t) \
1118 and f.startswith('combined-ext.') \
1119 and 'fit' in f:
1120 combined_skymap_filename = fv
1121 if rapidpe_pastro_needed \
1122 and 'p_astro' in t \
1123 and f == 'RapidPE_RIFT.p_astro.json':
1124 rapidpe_pastro_filename = fv
1126 if combined_skymap_needed:
1127 # for every alert, copy combined sky map over if applicable
1128 # FIXME: use file inheritance once available
1129 ext_id = superevent['em_type']
1130 if combined_skymap_filename:
1131 # If previous sky map, increase version by 1
1132 combined_skymap_filename_base, v = \
1133 combined_skymap_filename.split(',')
1134 v = str(int(v) + 1)
1135 combined_skymap_filename = \
1136 combined_skymap_filename_base + ',' + v
1137 else:
1138 combined_skymap_filename_base = \
1139 (external_skymaps.COMBINED_SKYMAP_FILENAME_MULTIORDER
1140 if '.multiorder.fits' in skymap_filename else
1141 external_skymaps.COMBINED_SKYMAP_FILENAME_FLAT)
1142 combined_skymap_filename = combined_skymap_filename_base + ',0'
1143 message = 'Combined LVK-external sky map copied from {0}'.format(
1144 ext_id)
1145 message_png = (
1146 'Mollweide projection of <a href="/api/events/{se_id}/files/'
1147 '{filename}">{filename}</a>, copied from {ext_id}').format(
1148 se_id=superevent_id,
1149 ext_id=ext_id,
1150 filename=combined_skymap_filename)
1152 combined_skymap_canvas = group(
1153 gracedb.download.si(combined_skymap_filename_base, ext_id)
1154 |
1155 gracedb.upload.s(
1156 combined_skymap_filename_base, superevent_id,
1157 message, ['sky_loc', 'ext_coinc', 'public'])
1158 |
1159 gracedb.create_label.si('COMBINEDSKYMAP_READY', superevent_id),
1161 gracedb.download.si(external_skymaps.COMBINED_SKYMAP_FILENAME_PNG,
1162 ext_id)
1163 |
1164 gracedb.upload.s(
1165 external_skymaps.COMBINED_SKYMAP_FILENAME_PNG, superevent_id,
1166 message_png, ['sky_loc', 'ext_coinc', 'public']
1167 )
1168 |
1169 # Pass None to download_anor_expose group
1170 identity.si()
1171 )
1173 # circular template not needed for less-significant alerts
1174 if alert_type in {'earlywarning', 'preliminary', 'initial'}:
1175 if raven_coinc:
1176 circular_task = circulars.create_emcoinc_circular.si(superevent_id)
1177 circular_filename = '{}-emcoinc-circular.txt'.format(alert_type)
1178 tags = ['em_follow', 'ext_coinc']
1180 else:
1181 circular_task = circulars.create_initial_circular.si(superevent_id)
1182 circular_filename = '{}-circular.txt'.format(alert_type)
1183 tags = ['em_follow']
1185 circular_canvas = (
1186 circular_task
1187 |
1188 gracedb.upload.s(
1189 circular_filename,
1190 superevent_id,
1191 'Template for {} GCN Circular'.format(alert_type),
1192 tags=tags)
1193 )
1195 else:
1196 circular_canvas = identity.si()
1198 # less-significant alerts have "preliminary" voevent notice type
1199 alert_type_voevent = 'preliminary' if alert_type == 'less-significant' \
1200 else alert_type
1201 # set the significant field in the VOEvent based on
1202 # less-significant/significant alert.
1203 # For kafka alerts the analogous field is set in alerts.py.
1204 # (see comment before defining kafka_alert_canvas)
1205 voevent_significance = 0 if alert_type == 'less-significant' else 1
1207 if filecontents and not combined_skymap_filename:
1208 skymap, em_bright, p_astro_dict = filecontents
1210 # check high profile and apply label if true
1211 if alert_type == 'preliminary':
1212 high_profile_canvas = rrt_utils.check_high_profile.si(
1213 skymap, em_bright, p_astro_dict, superevent
1214 )
1215 else:
1216 high_profile_canvas = identity.si()
1218 download_andor_expose_group = []
1219 if rapidpe_pastro_filename is None:
1220 voevent_canvas = _create_voevent.si(
1221 (em_bright, p_astro_dict),
1222 superevent_id,
1223 alert_type_voevent,
1224 Significant=voevent_significance,
1225 skymap_filename=skymap_filename,
1226 internal=False,
1227 open_alert=True,
1228 raven_coinc=raven_coinc,
1229 combined_skymap_filename=combined_skymap_filename
1230 )
1231 rapidpe_canvas = _update_rapidpe_pastro_shouldnt_run.s()
1233 # kafka alerts have a field called "significant" based on
1234 # https://dcc.ligo.org/LIGO-G2300151/public
1235 # The alert_type value passed to alerts.send is used to
1236 # set this field in the alert dictionary
1237 kafka_alert_canvas = alerts.send.si(
1238 (skymap, em_bright, p_astro_dict),
1239 superevent,
1240 alert_type,
1241 raven_coinc=raven_coinc
1242 )
1243 else:
1244 voevent_canvas = _create_voevent.s(
1245 superevent_id,
1246 alert_type_voevent,
1247 Significant=voevent_significance,
1248 skymap_filename=skymap_filename,
1249 internal=False,
1250 open_alert=True,
1251 raven_coinc=raven_coinc,
1252 combined_skymap_filename=combined_skymap_filename
1253 )
1254 download_andor_expose_group += [
1255 gracedb.download.si(rapidpe_pastro_filename, superevent_id)
1256 ]
1258 kafka_alert_canvas = _check_pastro_and_send_alert.s(
1259 skymap,
1260 em_bright,
1261 superevent,
1262 alert_type,
1263 raven_coinc=raven_coinc
1264 )
1266 rapidpe_canvas = (
1267 _update_rapidpe_pastro.s(
1268 em_bright=em_bright,
1269 pipeline_pastro=p_astro_dict)
1270 |
1271 _upload_rapidpe_pastro_json.s(
1272 superevent_id,
1273 rapidpe_pastro_filename
1274 )
1275 )
1276 else:
1277 # Download em_bright and p_astro files here for voevent
1278 download_andor_expose_group = [
1279 gracedb.download.si(em_bright_filename, superevent_id) if
1280 em_bright_filename is not None else identity.s(None),
1281 gracedb.download.si(p_astro_filename, superevent_id) if
1282 p_astro_filename is not None else identity.s(None),
1283 ]
1284 high_profile_canvas = identity.si()
1286 voevent_canvas = _create_voevent.s(
1287 superevent_id,
1288 alert_type_voevent,
1289 Significant=voevent_significance,
1290 skymap_filename=skymap_filename,
1291 internal=False,
1292 open_alert=True,
1293 raven_coinc=raven_coinc,
1294 combined_skymap_filename=combined_skymap_filename
1295 )
1297 if rapidpe_pastro_filename:
1298 download_andor_expose_group += [
1299 gracedb.download.si(rapidpe_pastro_filename, superevent_id)
1300 ]
1302 rapidpe_canvas = (
1303 _update_rapidpe_pastro.s()
1304 |
1305 _upload_rapidpe_pastro_json.s(
1306 superevent_id,
1307 rapidpe_pastro_filename
1308 )
1309 )
1311 # The skymap has not been downloaded at this point, so we need to
1312 # download it before we can assemble the kafka alerts and send them
1313 kafka_alert_canvas = alerts.download_skymap_and_send_alert.s(
1314 superevent,
1315 alert_type,
1316 skymap_filename=skymap_filename,
1317 raven_coinc=raven_coinc,
1318 combined_skymap_filename=combined_skymap_filename
1319 )
1321 to_expose = [skymap_filename, em_bright_filename, p_astro_filename]
1322 # Since PE skymap images, HTML, and gzip FITS are not made public when they
1323 # are uploaded, we need to expose them here.
1324 if (
1325 skymap_filename is not None and 'bilby' in skymap_filename.lower()
1326 ):
1327 prefix, _, _ = skymap_filename.partition('.multiorder.fits')
1328 to_expose += [f'{prefix}.html', f'{prefix}.png',
1329 f'{prefix}.volume.png', f'{prefix}.fits.gz']
1330 download_andor_expose_group += [
1331 gracedb.expose.si(superevent_id),
1332 *(
1333 gracedb.create_tag.si(filename, 'public', superevent_id)
1334 for filename in to_expose if filename is not None
1335 )
1336 ]
1338 voevent_canvas |= group(
1339 gracedb.download.s(superevent_id)
1340 |
1341 gcn.send.s(),
1343 gracedb.create_tag.s('public', superevent_id)
1344 )
1346 if combined_skymap_needed:
1347 download_andor_expose_group += [combined_skymap_canvas]
1349 sent_label_canvas = identity.si()
1350 if alert_type == 'less-significant':
1351 sent_label_canvas = gracedb.create_label.si(
1352 'LOW_SIGNIF_PRELIM_SENT',
1353 superevent_id
1354 )
1355 elif alert_type == 'preliminary':
1356 sent_label_canvas = gracedb.create_label.si(
1357 'GCN_PRELIM_SENT',
1358 superevent_id
1359 )
1361 # NOTE: The following canvas structure was used to fix #480
1362 canvas = (
1363 group(download_andor_expose_group)
1364 )
1365 if rapidpe_pastro_filename:
1366 canvas |= rapidpe_canvas
1368 canvas |= (
1369 group(
1370 voevent_canvas
1371 |
1372 group(
1373 circular_canvas,
1375 sent_label_canvas
1376 ),
1378 kafka_alert_canvas,
1380 high_profile_canvas
1381 )
1382 )
1384 canvas.apply_async()
1387@app.task(shared=False)
1388def _update_rapidpe_pastro(input_list, em_bright=None, pipeline_pastro=None):
1389 """
1390 If p_terr from rapidpe is different from the p_terr from the most
1391 recent preferred event, replaces rapidpe's p_terr with pipeline p_terr.
1392 Returns a tuple of em_bright, rapidpe pastro and a
1393 boolean(rapidpe_pastro_updated) indicating if rapidpe pastro has been
1394 updated. If p_terr in rapidpe has been updated, the return list contains
1395 the updated pastro and the rapidpe_pastro_updated is True. Else, the
1396 return list contains the rapidpe pastro from the input_list and
1397 rapidpe_pastro_updated is False.
1398 """
1399 # input_list is download_andor_expose_group in
1400 # function earlywarning_preliminary_initial_update_alert
1401 if pipeline_pastro is None:
1402 em_bright, pipeline_pastro, rapidpe_pastro, *_ = input_list
1403 else:
1404 rapidpe_pastro, *_ = input_list
1405 pipeline_pastro_contents = json.loads(pipeline_pastro)
1406 rapidpe_pastro_contents = json.loads(rapidpe_pastro)
1408 if (rapidpe_pastro_contents["Terrestrial"]
1409 == pipeline_pastro_contents["Terrestrial"]):
1410 rapidpe_pastro_updated = False
1411 else:
1412 rapidpe_pastro = json.dumps(
1413 rpe_pastro.renormalize_pastro_with_pipeline_pterr(
1414 rapidpe_pastro_contents, pipeline_pastro_contents
1415 )
1416 )
1417 rapidpe_pastro_updated = True
1419 return em_bright, rapidpe_pastro, rapidpe_pastro_updated
1422@app.task(shared=False)
1423def _update_rapidpe_pastro_shouldnt_run():
1424 raise RuntimeError(
1425 "The `rapidpe_canvas' was executed where it should"
1426 "not have been. A bug must have been introduced."
1427 )
1430@gracedb.task(shared=False)
1431def _upload_rapidpe_pastro_json(
1432 input_list,
1433 superevent_id,
1434 rapidpe_pastro_filename
1435):
1436 """
1437 Add public tag to RapidPE_RIFT.p_astro.json if p_terr from the
1438 preferred event is same as the p_terr in RapidPE_RIFT.p_astro.json.
1439 Else, uploads an updated version of RapidPE_RIFT.p_astro.json
1440 with file content from the task update_rapidpe_pastro.
1441 """
1442 # input_list is output from update_rapidpe_pastro
1443 *return_list, rapidpe_pastro_updated = input_list
1444 if rapidpe_pastro_updated is True:
1445 tags = ("pe", "p_astro", "public")
1447 upload_filename = "RapidPE_RIFT.p_astro.json"
1448 description = "RapidPE-RIFT Pastro results"
1449 content = input_list[1]
1450 gracedb.upload(
1451 content,
1452 upload_filename,
1453 superevent_id,
1454 description,
1455 tags
1456 )
1457 return return_list
1460@app.task(shared=False)
1461def _check_pastro_and_send_alert(input_classification, skymap, em_bright,
1462 superevent, alert_type, raven_coinc=False):
1463 """Wrapper for :meth:`~gwcelery.tasks.alerts.send` meant to take a
1464 potentially new p-astro as input from the preceding task.
1465 """
1466 _, p_astro = input_classification
1467 alerts.send.delay(
1468 (skymap, em_bright, p_astro),
1469 superevent,
1470 alert_type,
1471 raven_coinc=raven_coinc
1472 )
1475@gracedb.task(ignore_result=True, shared=False)
1476def initial_alert(filenames, alert):
1477 """Produce an initial alert.
1479 This does nothing more than call
1480 :meth:`~gwcelery.tasks.orchestrator.earlywarning_preliminary_initial_update_alert`
1481 with ``alert_type='initial'``.
1483 Parameters
1484 ----------
1485 filenames : tuple
1486 A list of the sky map, em_bright, and p_astro filenames.
1487 alert : dict
1488 IGWN-Alert dictionary
1490 Notes
1491 -----
1492 This function is decorated with :obj:`gwcelery.tasks.gracedb.task` rather
1493 than :obj:`gwcelery.app.task` so that a synchronous call to
1494 :func:`gwcelery.tasks.gracedb.get_log` is retried in the event of GraceDB
1495 API failures.
1497 """
1498 earlywarning_preliminary_initial_update_alert(
1499 filenames,
1500 alert['object'],
1501 'initial'
1502 )
1505@gracedb.task(ignore_result=True, shared=False)
1506def update_alert(filenames, superevent_id):
1507 """Produce an update alert.
1509 This does nothing more than call
1510 :meth:`~gwcelery.tasks.orchestrator.earlywarning_preliminary_initial_update_alert`
1511 with ``alert_type='update'``.
1513 Parameters
1514 ----------
1515 filenames : tuple
1516 A list of the sky map, em_bright, and p_astro filenames.
1517 superevent_id : str
1518 The superevent ID.
1520 Notes
1521 -----
1522 This function is decorated with :obj:`gwcelery.tasks.gracedb.task` rather
1523 than :obj:`gwcelery.app.task` so that a synchronous call to
1524 :func:`gwcelery.tasks.gracedb.get_log` is retried in the event of GraceDB
1525 API failures.
1527 """
1528 superevent = gracedb.get_superevent._orig_run(superevent_id)
1529 earlywarning_preliminary_initial_update_alert(
1530 filenames,
1531 superevent,
1532 'update'
1533 )
1536@app.task(ignore_result=True, shared=False)
1537def retraction_alert(alert):
1538 """Produce a retraction alert."""
1539 superevent_id = alert['uid']
1540 group(
1541 gracedb.expose.si(superevent_id)
1542 |
1543 group(
1544 _create_voevent.si(
1545 None, superevent_id, 'retraction',
1546 internal=False,
1547 open_alert=True
1548 )
1549 |
1550 group(
1551 gracedb.download.s(superevent_id)
1552 |
1553 gcn.send.s(),
1555 gracedb.create_tag.s('public', superevent_id)
1556 ),
1558 alerts.send.si(
1559 None,
1560 alert['object'],
1561 'retraction'
1562 )
1563 ),
1565 circulars.create_retraction_circular.si(superevent_id)
1566 |
1567 gracedb.upload.s(
1568 'retraction-circular.txt',
1569 superevent_id,
1570 'Template for retraction GCN Circular',
1571 tags=['em_follow']
1572 )
1573 ).apply_async()