Coverage for gwcelery/igwn_alert/bootsteps.py: 41%

61 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-17 17:22 +0000

1import json 

2import warnings 

3from threading import Thread 

4 

5from adc.errors import KafkaException 

6from celery import bootsteps 

7from celery.utils.log import get_logger 

8from hop.models import JSONBlob 

9from igwn_alert import client 

10 

11from .signals import igwn_alert_received 

12 

13__all__ = ('Receiver',) 

14 

15log = get_logger(__name__) 

16 

17 

18# Implemented from https://git.ligo.org/computing/igwn-alert/client/-/blob/main/igwn_alert/client.py # noqa: E501 

19# with minor differences 

20class IGWNAlertClient(client): 

21 def __init__(self, *args, **kwargs): 

22 super().__init__(*args, **kwargs) 

23 self.running = False 

24 

25 def listen(self, callback, topics): 

26 """ 

27 Set a callback to be executed for each pubsub item received. 

28 

29 Parameters 

30 ---------- 

31 callback : callable 

32 A function of two arguments: the topic and the alert payload. 

33 When set to :obj:`None`, print out alert payload. 

34 topics : :obj:`list` of :obj:`str` 

35 Topic or list of topics to listen to. 

36 """ 

37 self.running = True 

38 while self.running: 

39 self.stream_obj = self.open(self._construct_topic_url(topics), "r") # noqa: E501 

40 try: 

41 with self.stream_obj as s: 

42 for payload, metadata in s.read( 

43 metadata=True, 

44 batch_size=self.batch_size, 

45 batch_timeout=self.batch_timeout): 

46 # Fix in case message is in new format: 

47 if isinstance(payload, JSONBlob): 

48 payload = payload.content 

49 else: 

50 try: 

51 payload = json.loads(payload) 

52 except (json.JSONDecodeError, TypeError) as e: 

53 warnings.warn("Payload is not valid " 

54 "json: {}".format(e)) 

55 if not callback: 

56 print("New message from topic {topic}: {msg}" 

57 .format(topic=metadata.topic, msg=payload)) 

58 else: 

59 callback(topic=metadata.topic.split('.')[1], 

60 payload=payload) 

61 # FIXME: revisit when https://git.ligo.org/computing/igwn-alert/client/-/issues/19 # noqa: E501 

62 # is addressed 

63 except KafkaException as err: 

64 if err.fatal: 

65 # stop running and close before raising error 

66 self.running = False 

67 self.stream_obj.close() 

68 raise 

69 else: 

70 log.warning( 

71 "non-fatal error from kafka: {}".format(err.name)) 

72 

73 

74class IGWNAlertBootStep(bootsteps.ConsumerStep): 

75 """Generic boot step to limit us to appropriate kinds of workers. 

76 

77 Only include this bootstep in workers that are started with the 

78 ``--igwn-alerts`` command line option. 

79 """ 

80 

81 def __init__(self, consumer, igwn_alert=False, **kwargs): 

82 self.enabled = bool(igwn_alert) 

83 

84 def start(self, consumer): 

85 log.info('Starting %s', self.name) 

86 

87 def stop(self, consumer): 

88 log.info('Stopping %s', self.name) 

89 

90 

91def _send_igwn_alert(topic, payload): 

92 """Shim to send Celery signal.""" 

93 igwn_alert_received.send(None, topic=topic, payload=payload) 

94 

95 

96class Receiver(IGWNAlertBootStep): 

97 """Run the global IGWN alert receiver in background thread.""" 

98 

99 name = 'IGWN Alert client' 

100 

101 def start(self, consumer): 

102 super().start(consumer) 

103 

104 self._client = IGWNAlertClient( 

105 server=consumer.app.conf['igwn_alert_server'], 

106 noauth=consumer.app.conf['igwn_alert_noauth'], 

107 group=consumer.app.conf['igwn_alert_group']) 

108 self.thread = Thread( 

109 target=self._client.listen, 

110 args=(_send_igwn_alert, consumer.app.conf['igwn_alert_topics']), 

111 name='IGWNReceiverThread') 

112 self.thread.start() 

113 

114 def stop(self, consumer): 

115 super().stop(consumer) 

116 if self._client.running: 

117 self._client.running = False 

118 self._client.stream_obj._consumer.stop() 

119 self.thread.join() 

120 

121 def info(self, consumer): 

122 return {'igwn-alert-topics': consumer.app.conf[ 

123 'igwn_alert_topics'].intersection(self._client.get_topics())}