Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1"""Base classes for other Celery tasks.""" 

2from operator import itemgetter 

3 

4from celery import group 

5from celery.utils.log import get_logger 

6 

7from .. import app 

8 

9log = get_logger(__name__) 

10 

11 

12@app.task(shared=False) 

13def identity(arg=None): 

14 """Identity task (returns its input).""" 

15 return arg 

16 

17 

18@app.task(shared=False) 

19def _pack(result, i): 

20 return i, result 

21 

22 

23@app.task(shared=False) 

24def _unpack(results): 

25 return tuple(map(itemgetter(1), sorted(results, key=itemgetter(0)))) 

26 

27 

28@app.task(shared=False) 

29def _first(results): 

30 return min(results, key=itemgetter(0))[1] 

31 

32 

33@app.task(shared=False) 

34def _last(results): 

35 return max(results, key=itemgetter(0))[1] 

36 

37 

38def ordered_group(*args): 

39 """Like :meth:`celery.group`, but preserve order. 

40 

41 This is a temporary workaround for 

42 https://github.com/celery/celery/pull/4858. 

43 """ 

44 return group(arg | _pack.s(i) for i, arg in enumerate(args)) | _unpack.s() 

45 

46 

47def ordered_group_first(*args): 

48 """Like :meth:`celery.group`, but only return the first result.""" 

49 return group(arg | _pack.s(i) for i, arg in enumerate(args)) | _first.s() 

50 

51 

52def ordered_group_last(*args): 

53 """Like :meth:`celery.group`, but only return the last result.""" 

54 return group(arg | _pack.s(i) for i, arg in enumerate(args)) | _last.s() 

55 

56 

57class DispatchHandler(dict): 

58 

59 def process_args(self, *args, **kwargs): 

60 r"""Determine key and callback arguments. 

61 

62 The default implementation treats the first positional argument as the 

63 key. 

64 

65 Parameters 

66 ---------- 

67 \*args 

68 Arguments passed to :meth:`__call__`. 

69 \*\*kwargs 

70 Keyword arguments passed to :meth:`__call__`. 

71 

72 Returns 

73 ------- 

74 key 

75 The key to determine which callback to invoke. 

76 \*args 

77 The arguments to pass to the registered callback. 

78 \*\*kwargs 

79 The keyword arguments to pass to the registered callback. 

80 

81 """ 

82 key, *args = args 

83 return key, args, kwargs 

84 

85 def __call__(self, *keys, **kwargs): 

86 r"""Create a new task and register it as a callback for handling the 

87 given keys. 

88 

89 Parameters 

90 ---------- 

91 \*keys : list 

92 Keys to match 

93 \*\*kwargs 

94 Additional keyword arguments for `celery.Celery.task`. 

95 

96 """ 

97 def wrap(f): 

98 f = app.task(ignore_result=True, **kwargs)(f) 

99 for key in keys: 

100 self.setdefault(key, []).append(f) 

101 return f 

102 

103 return wrap 

104 

105 def dispatch(self, *args, **kwargs): 

106 log.debug('considering dispatch: args=%r, kwargs=%r', args, kwargs) 

107 try: 

108 key, args, kwargs = self.process_args(*args, **kwargs) 

109 except (TypeError, ValueError): 

110 log.exception('error unpacking key') 

111 return 

112 log.debug('unpacked: key=%r, args=%r, kwargs=%r', key, args, kwargs) 

113 

114 try: 

115 matching_handlers = self[key] 

116 except KeyError: 

117 log.warning('ignoring unrecognized key: %r', key) 

118 else: 

119 log.info('calling handlers %r for key %r', matching_handlers, key) 

120 group([handler.s() for handler in matching_handlers]).apply_async( 

121 args, kwargs)