Coverage for gwcelery/kafka/bootsteps.py: 68%
152 statements
« prev ^ index » next coverage.py v7.4.2, created at 2024-03-28 17:21 +0000
« prev ^ index » next coverage.py v7.4.2, created at 2024-03-28 17:21 +0000
1import json
2from functools import cache
3from os import path
4from threading import Thread
6from celery import bootsteps
7from celery.concurrency import solo
8from celery.utils.log import get_logger
9from confluent_kafka.error import KafkaException
10from fastavro.schema import parse_schema
11from hop import Stream, auth
12from hop.io import list_topics
13from hop.models import AvroBlob, JSONBlob
14from xdg.BaseDirectory import xdg_config_home
16from ..util import read_json
17from .signals import kafka_record_consumed
19__all__ = ('Producer', 'Consumer')
21log = get_logger(__name__)
24@cache
25def schema():
26 # The order does not matter other than the Alert schema must be loaded last
27 # because it references the other schema. All of the schema are saved in
28 # named_schemas, but we only need to save a reference to the the Alert
29 # schema to write the packet.
30 # NOTE Specifying expand=True when calling parse_schema is okay when only
31 # one schema contains references to other schema, in our case only the
32 # alerts schema contains references to other schema. More complicated
33 # relationships between schema though can lead to behavior that does not
34 # conform to the avro spec, and a different method will need to be used to
35 # load the schema. See https://github.com/fastavro/fastavro/issues/624 for
36 # more info.
37 named_schemas = {}
38 for s in ['igwn.alerts.v1_0.ExternalCoincInfo.avsc',
39 'igwn.alerts.v1_0.EventInfo.avsc',
40 'igwn.alerts.v1_0.AlertType.avsc',
41 'igwn.alerts.v1_0.Alert.avsc']:
42 schema = parse_schema(read_json('igwn_gwalert_schema', s),
43 named_schemas, expand=True)
45 return schema
48def _load_hopauth_map():
49 hop_auth = auth.load_auth()
50 with open(path.join(xdg_config_home,
51 'gwcelery/kafka_credential_map.json'),
52 'r') as fo:
53 kafka_credential_map = json.load(fo)
55 return hop_auth, kafka_credential_map
58class AvroBlobWrapper(AvroBlob):
60 def __init__(self, payload):
61 return super().__init__([payload], schema())
64class KafkaBase:
66 def __init__(self, name, config, prefix):
67 self.name = name
68 self._config = config
69 if config.get('auth') is not False:
70 # Users only add auth to config to disable authentication
71 self._credential = self.get_auth(prefix)
72 else:
73 # Dont use credentials
74 self._credential = False
75 self._hop_stream = Stream(self._credential)
77 # FIXME Drop get_payload_content method once
78 # https://github.com/scimma/hop-client/pull/190 is merged
79 if config['suffix'] == 'avro':
80 self.serialization_model = AvroBlobWrapper
81 self.get_payload_content = lambda payload: payload.content[0]
82 elif config['suffix'] == 'json':
83 self.serialization_model = JSONBlob
84 self.get_payload_content = lambda payload: payload.content
85 else:
86 raise NotImplementedError(
87 'Supported serialization method required for alert notices'
88 )
90 def get_auth(self, prefix):
91 hop_auth, kafka_credential_map = _load_hopauth_map()
93 # kafka_credential_map contains map between logical name for broker
94 # topic and username
95 username = kafka_credential_map.get(prefix, {}).get(self.name)
96 if username is None:
97 raise ValueError('Unable to find {} entry in kafka credential map '
98 'for {}'.format(prefix, self.name))
100 # hop auth contains map between username and password/hostname
101 target_auth = None
102 for cred in hop_auth:
103 if cred.username != username:
104 continue
105 target_auth = cred
106 break
107 else:
108 raise ValueError('Unable to find entry in hop auth file for '
109 'username {}'.format(username))
110 return target_auth
113class KafkaListener(KafkaBase):
115 def __init__(self, name, config):
116 super().__init__(name, config, 'consumer')
117 self._open_hop_stream = None
118 self.running = False
119 # Don't kill worker if listener can't connect
120 try:
121 self._open_hop_stream = self._hop_stream.open(config['url'], 'r')
122 except KafkaException:
123 log.exception('Connection to %s failed', self._config["url"])
124 except ValueError:
125 # Hop client will return a ValueError if the topic doesn't exist on
126 # the broker
127 log.exception('Connection to %s failed', self._config["url"])
129 def listen(self):
130 self.running = True
131 # Restart the consumer when non-fatal errors come up, similar to
132 # gwcelery.igwn_alert.IGWNAlertClient
133 while self.running:
134 try:
135 for message in self._open_hop_stream:
136 # Send signal
137 kafka_record_consumed.send(
138 None,
139 name=self.name,
140 record=self.get_payload_content(message)
141 )
142 except KafkaException as exception:
143 err = exception.args[0]
144 if self.running is False:
145 # The close attempt in the KafkaListener stop method throws
146 # a KafkaException that's caught by this try except, so we
147 # just have to catch this case for the worker to shut down
148 # gracefully
149 pass
150 elif err.fatal():
151 # stop running and close before raising error
152 self.running = False
153 self._open_hop_stream.close()
154 raise
155 else:
156 log.warning(
157 "non-fatal error from kafka: {}".format(err.name))
160class KafkaWriter(KafkaBase):
161 """Write Kafka topics and monitor health."""
163 def __init__(self, name, config):
164 super().__init__(name, config, 'producer')
165 self._open_hop_stream = self._hop_stream.open(
166 config['url'], 'w',
167 message_max_bytes=1024 * 1024 * 8,
168 compression_type='zstd')
170 # Set up flag for failed delivery of messages
171 self.kafka_delivery_failures = False
173 def kafka_topic_up(self):
174 '''Check for problems in broker and topic. Returns True is broker and
175 topic appear to be up, returns False otherwise.'''
176 kafka_url = self._config['url']
177 _, _, broker, topic = kafka_url.split('/')
178 try:
179 topics = list_topics(kafka_url, auth=self._credential, timeout=5)
180 if topics[topic].error is None:
181 log.info(f'{kafka_url} appears to be functioning properly')
182 return True
183 else:
184 log.error(f'{topic} at {broker} appears to be down')
185 return False
186 except KafkaException:
187 log.error(f'{broker} appears to be down')
188 return False
190 def _delivery_cb(self, kafka_error, message):
191 # FIXME Get rid of if-else logic once
192 # https://github.com/scimma/hop-client/pull/190 is merged
193 if self._config['suffix'] == 'avro':
194 record = AvroBlob.deserialize(message.value()).content[0]
195 else:
196 record = JSONBlob.deserialize(message.value()).content
197 kafka_url = self._config['url']
198 if kafka_error is None:
199 self.kafka_delivery_failures = False
200 else:
201 log.error(f'Received error code {kafka_error.code()} '
202 f'({kafka_error.name()}, {kafka_error.str()}) '
203 f'when delivering {record["superevent_id"]} '
204 f'{record["alert_type"]} alert to {kafka_url}')
205 self.kafka_delivery_failures = True
207 def write(self, payload):
208 self._open_hop_stream.write(payload,
209 delivery_callback=self._delivery_cb)
210 self._open_hop_stream.flush()
213class KafkaBootStep(bootsteps.ConsumerStep):
214 """Generic boot step to limit us to appropriate kinds of workers.
216 Only include this bootstep in workers that are configured to listen to the
217 ``kafka`` queue.
218 """
220 def create(self, consumer):
221 if not isinstance(consumer.pool, solo.TaskPool):
222 raise RuntimeError(
223 'The Kafka broker only works with the "solo" task pool. '
224 'Start the worker with "--queues=kafka --pool=solo".')
227class Consumer(KafkaBootStep):
228 """Run MOU Kafka consumers in background threads.
229 """
231 name = 'Kafka consumer'
233 def include_if(self, consumer):
234 return 'kafka-consumer' in consumer.app.amqp.queues
236 def start(self, consumer):
237 log.info(f'Starting {self.name}, topics: ' +
238 ' '.join(config['url'] for config in
239 consumer.app.conf['kafka_consumer_config'].values()))
240 self._listeners = {
241 key: KafkaListener(key, config) for key, config in
242 consumer.app.conf['kafka_consumer_config'].items()
243 }
244 self.threads = [
245 Thread(target=s.listen, name=f'{key}_KafkaConsumerThread') for key,
246 s in self._listeners.items() if s._open_hop_stream is not None
247 ]
248 for thread in self.threads:
249 thread.start()
251 def stop(self, consumer):
252 log.info('Closing connection to topics: ' +
253 ' '.join(listener._config['url'] for listener in
254 self._listeners.values() if listener._open_hop_stream
255 is not None))
256 for s in self._listeners.values():
257 s.running = False
258 if s._open_hop_stream is not None:
259 s._open_hop_stream.close()
261 for thread in self.threads:
262 thread.join()
264 def info(self, consumer):
265 return {
266 'active_kafka_consumers': {listener.name for listener in
267 self._listeners.values() if
268 listener._open_hop_stream is not
269 None}
270 }
273class Producer(KafkaBootStep):
274 """Run the global Kafka producers in a background thread.
276 Flags that document the health of the connections are made available
277 :ref:`inspection <celery:worker-inspect>` with the ``gwcelery inspect
278 stats`` command under the ``kafka_topic_up`` and
279 ``kafka_delivery_failures`` keys.
280 """
282 name = 'Kafka producer'
284 def include_if(self, consumer):
285 return 'kafka-producer' in consumer.app.amqp.queues
287 def start(self, consumer):
288 log.info(f'Starting {self.name}, topics: ' +
289 ' '.join(config['url'] for config in
290 consumer.app.conf['kafka_alert_config'].values()))
291 consumer.app.conf['kafka_streams'] = self._writers = {
292 brokerhost: KafkaWriter(brokerhost, config) for brokerhost, config
293 in consumer.app.conf['kafka_alert_config'].items()
294 }
296 def stop(self, consumer):
297 log.info('Closing connection to topics: ' +
298 ' '.join(config['url'] for config in
299 consumer.app.conf['kafka_alert_config'].values()))
300 for s in self._writers.values():
301 s._open_hop_stream.close()
303 def info(self, consumer):
304 return {'kafka_topic_up': {
305 brokerhost: writer.kafka_topic_up() for brokerhost,
306 writer in self._writers.items()
307 },
308 'kafka_delivery_failures': {
309 brokerhost: writer.kafka_delivery_failures for
310 brokerhost, writer in self._writers.items()
311 }}