Coverage for gwcelery/voevent/subscriber.py: 71%

14 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-25 18:01 +0000

1"""Subclasses of :class:`comet.protocol.VOEventSubscriber` and 

2:class:`comet.protocol.VOEventSubscriberFactory` that allow inspection of the 

3list of active subscribers. 

4""" 

5from comet.protocol.subscriber import VOEventSubscriber as _VOEventSubscriber 

6from comet.protocol.subscriber import \ 

7 VOEventSubscriberFactory as _VOEventSubscriberFactory 

8 

9 

10class VOEventSubscriber(_VOEventSubscriber): 

11 

12 def connectionMade(self, *args): # noqa: N802 

13 self.factory.subscribers.append(self) 

14 super().connectionMade(*args) 

15 

16 def connectionLost(self, *args): # noqa: N802 

17 self.factory.subscribers.remove(self) 

18 return super().connectionLost(*args) 

19 

20 

21class VOEventSubscriberFactory(_VOEventSubscriberFactory): 

22 

23 protocol = VOEventSubscriber 

24 

25 def __init__(self, *args, **kwargs): 

26 super().__init__(*args, **kwargs) 

27 self.subscribers = []