Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import threading 

2 

3from celery import bootsteps 

4from celery.concurrency import solo 

5 

6from .util import get_host_port, get_local_ivo, get_network 

7from .logging import log 

8from .signals import voevent_received 

9 

10__all__ = ('Broadcaster', 'Reactor', 'Receiver') 

11 

12 

13class VOEventBootStep(bootsteps.ConsumerStep): 

14 """Generic boot step to limit us to appropriate kinds of workers.""" 

15 

16 def include_if(self, consumer): 

17 """Only include this bootstep in workers that are configured to listen 

18 to the ``voevent`` queue. 

19 """ 

20 return 'voevent' in consumer.app.amqp.queues 

21 

22 def create(self, consumer): 

23 if not isinstance(consumer.pool, solo.TaskPool): 

24 raise RuntimeError( 

25 'The VOEvent broker only works with the "solo" task pool. ' 

26 'Start the worker with "--queues=voevent --pool=solo".') 

27 

28 def start(self, consumer): 

29 log.info('Starting %s', self.name) 

30 

31 def stop(self, consumer): 

32 log.info('Stopping %s', self.name) 

33 

34 

35class Reactor(VOEventBootStep): 

36 """Run the global Twisted reactor in background thread. 

37 

38 The Twisted reactor is a global run loop that drives all Twisted services 

39 and operations. This boot step starts the Twisted reactor in a background 

40 thread when the Celery consumer starts, and stops the thread when the 

41 Consumer terminates. 

42 """ 

43 

44 name = 'Twisted reactor' 

45 

46 def __init__(self, consumer, **kwargs): 

47 self._thread = None 

48 

49 def create(self, consumer): 

50 from twisted.internet import reactor 

51 

52 super().create(consumer) 

53 self._thread = threading.Thread(target=reactor.run, args=(False,), 

54 name='TwistedReactorThread') 

55 

56 def start(self, consumer): 

57 super().start(consumer) 

58 self._thread.start() 

59 

60 def stop(self, consumer): 

61 from twisted.internet import reactor 

62 

63 super().stop(consumer) 

64 reactor.callFromThread(reactor.stop) 

65 self._thread.join() 

66 

67 

68class TwistedService(VOEventBootStep): 

69 """A generic bootstep to create, start, and stop a Twisted service.""" 

70 

71 requires = VOEventBootStep.requires + (Reactor,) 

72 

73 def __init__(self, consumer, **kwargs): 

74 self._service = None 

75 

76 def create(self, consumer): 

77 super().create(consumer) 

78 self._service = self.create_service(consumer) 

79 

80 def create_service(self, consumer): 

81 raise NotImplementedError 

82 

83 def start(self, consumer): 

84 from twisted.internet import reactor 

85 

86 super().start(consumer) 

87 reactor.callFromThread(self._service.startService) 

88 

89 def stop(self, consumer): 

90 from twisted.internet import reactor 

91 

92 super().stop(consumer) 

93 reactor.callFromThread(self._service.stopService) 

94 

95 

96class Broadcaster(TwistedService): 

97 """Comet-based VOEvent broadcaster. 

98 

99 Run a Comet-based VOEvent broadcaster 

100 (:class:`comet.protocol.broadcaster.VOEventBroadcasterFactory`). Starts 

101 after the :class:`~gwcelery.voevent.bootsteps.Reactor` bootstep. 

102 

103 A few :doc:`configuration options <configuration>` are available: 

104 

105 * ``voevent_broadcaster_address``: The address to bind to, in 

106 :samp:`{host}:{port}` format. 

107 * ``voevent_broadcaster_whitelist``: A list of hostnames, IP addresses, or 

108 CIDR address ranges from which to accept connections. 

109 

110 The list of active connections is made available :ref:`inspection 

111 <celery:worker-inspect>` with the ``gwcelery inspect stats`` command under 

112 the ``voevent-broker-peers`` key. 

113 """ 

114 

115 name = 'VOEvent broadcaster' 

116 

117 def create_service(self, consumer): 

118 from comet.protocol.broadcaster import VOEventBroadcasterFactory 

119 from comet.utility import WhitelistingFactory 

120 from twisted.application.internet import TCPServer 

121 

122 conf = consumer.app.conf 

123 local_ivo = get_local_ivo(consumer.app) 

124 host, port = get_host_port(conf['voevent_broadcaster_address']) 

125 allow = [get_network(a) for a in conf['voevent_broadcaster_whitelist']] 

126 conf['voevent_broadcaster_factory'] = self._factory = factory = \ 

127 VOEventBroadcasterFactory(local_ivo, 0) 

128 if allow: 

129 factory = WhitelistingFactory(factory, allow, 'subscription') 

130 return TCPServer(port, factory, interface=host) 

131 

132 def info(self, consumer): 

133 return {'voevent-broker-peers': [ 

134 b.transport.getPeer().host for b in self._factory.broadcasters]} 

135 

136 

137class Receiver(TwistedService): 

138 """VOEvent receiver. 

139 

140 Run a Comet-based VOEvent receiver 

141 (:class:`comet.protocol.subscriber.VOEventSubscriberFactory`). Starts after 

142 the :class:`~gwcelery.voevent.bootsteps.Reactor` bootstep. 

143 

144 A few :doc:`configuration options <configuration>` are available: 

145 

146 * ``voevent_receiver_address``: The address to connect to, in 

147 :samp:`{host}:{port}` format. 

148 

149 The list of active connections is made available :ref:`inspection 

150 <celery:worker-inspect>` with the ``gwcelery inspect stats`` command under 

151 the ``voevent-receiver-peers`` key. 

152 """ 

153 

154 name = 'VOEvent receiver' 

155 

156 requires = TwistedService.requires + ( 

157 'celery.worker.consumer.tasks:Tasks',) 

158 

159 def create_service(self, consumer): 

160 from comet.icomet import IHandler 

161 from twisted.application.internet import TCPClient 

162 from zope.interface import implementer 

163 from .subscriber import VOEventSubscriberFactory 

164 

165 @implementer(IHandler) 

166 class Handler: 

167 

168 def __call__(self, event): 

169 from twisted.internet import reactor 

170 reactor.callInThread( 

171 voevent_received.send, sender=None, xml_document=event) 

172 

173 conf = consumer.app.conf 

174 local_ivo = get_local_ivo(consumer.app) 

175 host, port = get_host_port(conf['voevent_receiver_address']) 

176 self._factory = factory = VOEventSubscriberFactory( 

177 local_ivo=local_ivo, handlers=[Handler()]) 

178 return TCPClient(host, port, factory) 

179 

180 def info(self, consumer): 

181 return {'voevent-receiver-peers': [ 

182 b.transport.getPeer().host for b in self._factory.subscribers]}