Coverage for gwcelery/tasks/core.py: 85%

46 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-17 17:22 +0000

1"""Base classes for other Celery tasks.""" 

2from celery import group 

3from celery.utils.log import get_logger 

4 

5from .. import app 

6 

7log = get_logger(__name__) 

8 

9 

10@app.task(shared=False) 

11def identity(arg=None): 

12 """Identity task (returns its input).""" 

13 return arg 

14 

15 

16@app.task(shared=False) 

17def get_first(args): 

18 """Get the first result of a group. Identity for scalar""" 

19 try: 

20 first, *_ = args 

21 except TypeError: 

22 first = args # if scalar 

23 return first 

24 

25 

26@app.task(shared=False) 

27def get_last(args): 

28 """Get the last result of a group. Identity for scalar""" 

29 try: 

30 *_, last = args 

31 except TypeError: 

32 last = args # if scalar 

33 return last 

34 

35 

36class DispatchHandler(dict): 

37 

38 def process_args(self, *args, **kwargs): 

39 r"""Determine key and callback arguments. 

40 

41 The default implementation treats the first positional argument as the 

42 key. 

43 

44 Parameters 

45 ---------- 

46 \*args 

47 Arguments passed to :meth:`__call__`. 

48 \*\*kwargs 

49 Keyword arguments passed to :meth:`__call__`. 

50 

51 Returns 

52 ------- 

53 key 

54 The key to determine which callback to invoke. 

55 \*args 

56 The arguments to pass to the registered callback. 

57 \*\*kwargs 

58 The keyword arguments to pass to the registered callback. 

59 

60 """ 

61 key, *args = args 

62 return key, args, kwargs 

63 

64 def __call__(self, *keys, **kwargs): 

65 r"""Create a new task and register it as a callback for handling the 

66 given keys. 

67 

68 Parameters 

69 ---------- 

70 \*keys : list 

71 Keys to match 

72 \*\*kwargs 

73 Additional keyword arguments for `celery.Celery.task`. 

74 

75 """ 

76 def wrap(f): 

77 f = app.task(ignore_result=True, **kwargs)(f) 

78 for key in keys: 

79 self.setdefault(key, []).append(f) 

80 return f 

81 

82 return wrap 

83 

84 def dispatch(self, *args, **kwargs): 

85 log.debug('considering dispatch: args=%r, kwargs=%r', args, kwargs) 

86 try: 

87 key, args, kwargs = self.process_args(*args, **kwargs) 

88 except (TypeError, ValueError): 

89 log.exception('error unpacking key') 

90 return 

91 log.debug('unpacked: key=%r, args=%r, kwargs=%r', key, args, kwargs) 

92 

93 try: 

94 matching_handlers = self[key] 

95 except KeyError: 

96 log.warning('ignoring unrecognized key: %r', key) 

97 else: 

98 log.info('calling handlers %r for key %r', matching_handlers, key) 

99 group([handler.s() for handler in matching_handlers]).apply_async( 

100 args, kwargs)