Coverage for gwcelery/kafka/bootsteps.py: 68%

152 statements  

« prev     ^ index     » next       coverage.py v7.4.2, created at 2024-03-28 17:21 +0000

1import json 

2from functools import cache 

3from os import path 

4from threading import Thread 

5 

6from celery import bootsteps 

7from celery.concurrency import solo 

8from celery.utils.log import get_logger 

9from confluent_kafka.error import KafkaException 

10from fastavro.schema import parse_schema 

11from hop import Stream, auth 

12from hop.io import list_topics 

13from hop.models import AvroBlob, JSONBlob 

14from xdg.BaseDirectory import xdg_config_home 

15 

16from ..util import read_json 

17from .signals import kafka_record_consumed 

18 

19__all__ = ('Producer', 'Consumer') 

20 

21log = get_logger(__name__) 

22 

23 

24@cache 

25def schema(): 

26 # The order does not matter other than the Alert schema must be loaded last 

27 # because it references the other schema. All of the schema are saved in 

28 # named_schemas, but we only need to save a reference to the the Alert 

29 # schema to write the packet. 

30 # NOTE Specifying expand=True when calling parse_schema is okay when only 

31 # one schema contains references to other schema, in our case only the 

32 # alerts schema contains references to other schema. More complicated 

33 # relationships between schema though can lead to behavior that does not 

34 # conform to the avro spec, and a different method will need to be used to 

35 # load the schema. See https://github.com/fastavro/fastavro/issues/624 for 

36 # more info. 

37 named_schemas = {} 

38 for s in ['igwn.alerts.v1_0.ExternalCoincInfo.avsc', 

39 'igwn.alerts.v1_0.EventInfo.avsc', 

40 'igwn.alerts.v1_0.AlertType.avsc', 

41 'igwn.alerts.v1_0.Alert.avsc']: 

42 schema = parse_schema(read_json('igwn_gwalert_schema', s), 

43 named_schemas, expand=True) 

44 

45 return schema 

46 

47 

48def _load_hopauth_map(): 

49 hop_auth = auth.load_auth() 

50 with open(path.join(xdg_config_home, 

51 'gwcelery/kafka_credential_map.json'), 

52 'r') as fo: 

53 kafka_credential_map = json.load(fo) 

54 

55 return hop_auth, kafka_credential_map 

56 

57 

58class AvroBlobWrapper(AvroBlob): 

59 

60 def __init__(self, payload): 

61 return super().__init__([payload], schema()) 

62 

63 

64class KafkaBase: 

65 

66 def __init__(self, name, config, prefix): 

67 self.name = name 

68 self._config = config 

69 if config.get('auth') is not False: 

70 # Users only add auth to config to disable authentication 

71 self._credential = self.get_auth(prefix) 

72 else: 

73 # Dont use credentials 

74 self._credential = False 

75 self._hop_stream = Stream(self._credential) 

76 

77 # FIXME Drop get_payload_content method once 

78 # https://github.com/scimma/hop-client/pull/190 is merged 

79 if config['suffix'] == 'avro': 

80 self.serialization_model = AvroBlobWrapper 

81 self.get_payload_content = lambda payload: payload.content[0] 

82 elif config['suffix'] == 'json': 

83 self.serialization_model = JSONBlob 

84 self.get_payload_content = lambda payload: payload.content 

85 else: 

86 raise NotImplementedError( 

87 'Supported serialization method required for alert notices' 

88 ) 

89 

90 def get_auth(self, prefix): 

91 hop_auth, kafka_credential_map = _load_hopauth_map() 

92 

93 # kafka_credential_map contains map between logical name for broker 

94 # topic and username 

95 username = kafka_credential_map.get(prefix, {}).get(self.name) 

96 if username is None: 

97 raise ValueError('Unable to find {} entry in kafka credential map ' 

98 'for {}'.format(prefix, self.name)) 

99 

100 # hop auth contains map between username and password/hostname 

101 target_auth = None 

102 for cred in hop_auth: 

103 if cred.username != username: 

104 continue 

105 target_auth = cred 

106 break 

107 else: 

108 raise ValueError('Unable to find entry in hop auth file for ' 

109 'username {}'.format(username)) 

110 return target_auth 

111 

112 

113class KafkaListener(KafkaBase): 

114 

115 def __init__(self, name, config): 

116 super().__init__(name, config, 'consumer') 

117 self._open_hop_stream = None 

118 self.running = False 

119 # Don't kill worker if listener can't connect 

120 try: 

121 self._open_hop_stream = self._hop_stream.open(config['url'], 'r') 

122 except KafkaException: 

123 log.exception('Connection to %s failed', self._config["url"]) 

124 except ValueError: 

125 # Hop client will return a ValueError if the topic doesn't exist on 

126 # the broker 

127 log.exception('Connection to %s failed', self._config["url"]) 

128 

129 def listen(self): 

130 self.running = True 

131 # Restart the consumer when non-fatal errors come up, similar to 

132 # gwcelery.igwn_alert.IGWNAlertClient 

133 while self.running: 

134 try: 

135 for message in self._open_hop_stream: 

136 # Send signal 

137 kafka_record_consumed.send( 

138 None, 

139 name=self.name, 

140 record=self.get_payload_content(message) 

141 ) 

142 except KafkaException as exception: 

143 err = exception.args[0] 

144 if self.running is False: 

145 # The close attempt in the KafkaListener stop method throws 

146 # a KafkaException that's caught by this try except, so we 

147 # just have to catch this case for the worker to shut down 

148 # gracefully 

149 pass 

150 elif err.fatal(): 

151 # stop running and close before raising error 

152 self.running = False 

153 self._open_hop_stream.close() 

154 raise 

155 else: 

156 log.warning( 

157 "non-fatal error from kafka: {}".format(err.name)) 

158 

159 

160class KafkaWriter(KafkaBase): 

161 """Write Kafka topics and monitor health.""" 

162 

163 def __init__(self, name, config): 

164 super().__init__(name, config, 'producer') 

165 self._open_hop_stream = self._hop_stream.open( 

166 config['url'], 'w', 

167 message_max_bytes=1024 * 1024 * 8, 

168 compression_type='zstd') 

169 

170 # Set up flag for failed delivery of messages 

171 self.kafka_delivery_failures = False 

172 

173 def kafka_topic_up(self): 

174 '''Check for problems in broker and topic. Returns True is broker and 

175 topic appear to be up, returns False otherwise.''' 

176 kafka_url = self._config['url'] 

177 _, _, broker, topic = kafka_url.split('/') 

178 try: 

179 topics = list_topics(kafka_url, auth=self._credential, timeout=5) 

180 if topics[topic].error is None: 

181 log.info(f'{kafka_url} appears to be functioning properly') 

182 return True 

183 else: 

184 log.error(f'{topic} at {broker} appears to be down') 

185 return False 

186 except KafkaException: 

187 log.error(f'{broker} appears to be down') 

188 return False 

189 

190 def _delivery_cb(self, kafka_error, message): 

191 # FIXME Get rid of if-else logic once 

192 # https://github.com/scimma/hop-client/pull/190 is merged 

193 if self._config['suffix'] == 'avro': 

194 record = AvroBlob.deserialize(message.value()).content[0] 

195 else: 

196 record = JSONBlob.deserialize(message.value()).content 

197 kafka_url = self._config['url'] 

198 if kafka_error is None: 

199 self.kafka_delivery_failures = False 

200 else: 

201 log.error(f'Received error code {kafka_error.code()} ' 

202 f'({kafka_error.name()}, {kafka_error.str()}) ' 

203 f'when delivering {record["superevent_id"]} ' 

204 f'{record["alert_type"]} alert to {kafka_url}') 

205 self.kafka_delivery_failures = True 

206 

207 def write(self, payload): 

208 self._open_hop_stream.write(payload, 

209 delivery_callback=self._delivery_cb) 

210 self._open_hop_stream.flush() 

211 

212 

213class KafkaBootStep(bootsteps.ConsumerStep): 

214 """Generic boot step to limit us to appropriate kinds of workers. 

215 

216 Only include this bootstep in workers that are configured to listen to the 

217 ``kafka`` queue. 

218 """ 

219 

220 def create(self, consumer): 

221 if not isinstance(consumer.pool, solo.TaskPool): 

222 raise RuntimeError( 

223 'The Kafka broker only works with the "solo" task pool. ' 

224 'Start the worker with "--queues=kafka --pool=solo".') 

225 

226 

227class Consumer(KafkaBootStep): 

228 """Run MOU Kafka consumers in background threads. 

229 """ 

230 

231 name = 'Kafka consumer' 

232 

233 def include_if(self, consumer): 

234 return 'kafka-consumer' in consumer.app.amqp.queues 

235 

236 def start(self, consumer): 

237 log.info(f'Starting {self.name}, topics: ' + 

238 ' '.join(config['url'] for config in 

239 consumer.app.conf['kafka_consumer_config'].values())) 

240 self._listeners = { 

241 key: KafkaListener(key, config) for key, config in 

242 consumer.app.conf['kafka_consumer_config'].items() 

243 } 

244 self.threads = [ 

245 Thread(target=s.listen, name=f'{key}_KafkaConsumerThread') for key, 

246 s in self._listeners.items() if s._open_hop_stream is not None 

247 ] 

248 for thread in self.threads: 

249 thread.start() 

250 

251 def stop(self, consumer): 

252 log.info('Closing connection to topics: ' + 

253 ' '.join(listener._config['url'] for listener in 

254 self._listeners.values() if listener._open_hop_stream 

255 is not None)) 

256 for s in self._listeners.values(): 

257 s.running = False 

258 if s._open_hop_stream is not None: 

259 s._open_hop_stream.close() 

260 

261 for thread in self.threads: 

262 thread.join() 

263 

264 def info(self, consumer): 

265 return { 

266 'active_kafka_consumers': {listener.name for listener in 

267 self._listeners.values() if 

268 listener._open_hop_stream is not 

269 None} 

270 } 

271 

272 

273class Producer(KafkaBootStep): 

274 """Run the global Kafka producers in a background thread. 

275 

276 Flags that document the health of the connections are made available 

277 :ref:`inspection <celery:worker-inspect>` with the ``gwcelery inspect 

278 stats`` command under the ``kafka_topic_up`` and 

279 ``kafka_delivery_failures`` keys. 

280 """ 

281 

282 name = 'Kafka producer' 

283 

284 def include_if(self, consumer): 

285 return 'kafka-producer' in consumer.app.amqp.queues 

286 

287 def start(self, consumer): 

288 log.info(f'Starting {self.name}, topics: ' + 

289 ' '.join(config['url'] for config in 

290 consumer.app.conf['kafka_alert_config'].values())) 

291 consumer.app.conf['kafka_streams'] = self._writers = { 

292 brokerhost: KafkaWriter(brokerhost, config) for brokerhost, config 

293 in consumer.app.conf['kafka_alert_config'].items() 

294 } 

295 

296 def stop(self, consumer): 

297 log.info('Closing connection to topics: ' + 

298 ' '.join(config['url'] for config in 

299 consumer.app.conf['kafka_alert_config'].values())) 

300 for s in self._writers.values(): 

301 s._open_hop_stream.close() 

302 

303 def info(self, consumer): 

304 return {'kafka_topic_up': { 

305 brokerhost: writer.kafka_topic_up() for brokerhost, 

306 writer in self._writers.items() 

307 }, 

308 'kafka_delivery_failures': { 

309 brokerhost: writer.kafka_delivery_failures for 

310 brokerhost, writer in self._writers.items() 

311 }}