Coverage for gwcelery/igwn_alert/bootsteps.py: 41%
61 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-17 17:22 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-17 17:22 +0000
1import json
2import warnings
3from threading import Thread
5from adc.errors import KafkaException
6from celery import bootsteps
7from celery.utils.log import get_logger
8from hop.models import JSONBlob
9from igwn_alert import client
11from .signals import igwn_alert_received
13__all__ = ('Receiver',)
15log = get_logger(__name__)
18# Implemented from https://git.ligo.org/computing/igwn-alert/client/-/blob/main/igwn_alert/client.py # noqa: E501
19# with minor differences
20class IGWNAlertClient(client):
21 def __init__(self, *args, **kwargs):
22 super().__init__(*args, **kwargs)
23 self.running = False
25 def listen(self, callback, topics):
26 """
27 Set a callback to be executed for each pubsub item received.
29 Parameters
30 ----------
31 callback : callable
32 A function of two arguments: the topic and the alert payload.
33 When set to :obj:`None`, print out alert payload.
34 topics : :obj:`list` of :obj:`str`
35 Topic or list of topics to listen to.
36 """
37 self.running = True
38 while self.running:
39 self.stream_obj = self.open(self._construct_topic_url(topics), "r") # noqa: E501
40 try:
41 with self.stream_obj as s:
42 for payload, metadata in s.read(
43 metadata=True,
44 batch_size=self.batch_size,
45 batch_timeout=self.batch_timeout):
46 # Fix in case message is in new format:
47 if isinstance(payload, JSONBlob):
48 payload = payload.content
49 else:
50 try:
51 payload = json.loads(payload)
52 except (json.JSONDecodeError, TypeError) as e:
53 warnings.warn("Payload is not valid "
54 "json: {}".format(e))
55 if not callback:
56 print("New message from topic {topic}: {msg}"
57 .format(topic=metadata.topic, msg=payload))
58 else:
59 callback(topic=metadata.topic.split('.')[1],
60 payload=payload)
61 # FIXME: revisit when https://git.ligo.org/computing/igwn-alert/client/-/issues/19 # noqa: E501
62 # is addressed
63 except KafkaException as err:
64 if err.fatal:
65 # stop running and close before raising error
66 self.running = False
67 self.stream_obj.close()
68 raise
69 else:
70 log.warning(
71 "non-fatal error from kafka: {}".format(err.name))
74class IGWNAlertBootStep(bootsteps.ConsumerStep):
75 """Generic boot step to limit us to appropriate kinds of workers.
77 Only include this bootstep in workers that are started with the
78 ``--igwn-alerts`` command line option.
79 """
81 def __init__(self, consumer, igwn_alert=False, **kwargs):
82 self.enabled = bool(igwn_alert)
84 def start(self, consumer):
85 log.info('Starting %s', self.name)
87 def stop(self, consumer):
88 log.info('Stopping %s', self.name)
91def _send_igwn_alert(topic, payload):
92 """Shim to send Celery signal."""
93 igwn_alert_received.send(None, topic=topic, payload=payload)
96class Receiver(IGWNAlertBootStep):
97 """Run the global IGWN alert receiver in background thread."""
99 name = 'IGWN Alert client'
101 def start(self, consumer):
102 super().start(consumer)
104 self._client = IGWNAlertClient(
105 server=consumer.app.conf['igwn_alert_server'],
106 noauth=consumer.app.conf['igwn_alert_noauth'],
107 group=consumer.app.conf['igwn_alert_group'])
108 self.thread = Thread(
109 target=self._client.listen,
110 args=(_send_igwn_alert, consumer.app.conf['igwn_alert_topics']),
111 name='IGWNReceiverThread')
112 self.thread.start()
114 def stop(self, consumer):
115 super().stop(consumer)
116 if self._client.running:
117 self._client.running = False
118 self._client.stream_obj._consumer.stop()
119 self.thread.join()
121 def info(self, consumer):
122 return {'igwn-alert-topics': consumer.app.conf[
123 'igwn_alert_topics'].intersection(self._client.get_topics())}