Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

"""Base classes for other Celery tasks.""" 

from operator import itemgetter 

 

from celery import group 

from celery.utils.log import get_logger 

 

from .. import app 

 

log = get_logger(__name__) 

 

 

@app.task(shared=False) 

def identity(arg): 

"""Identity task (returns its input).""" 

return arg 

 

 

@app.task(shared=False) 

def _pack(result, i): 

return i, result 

 

 

@app.task(shared=False) 

def _unpack(results): 

return tuple(map(itemgetter(1), sorted(results, key=itemgetter(0)))) 

 

 

@app.task(shared=False) 

def _first(results): 

return min(results, key=itemgetter(0))[1] 

 

 

@app.task(shared=False) 

def _last(results): 

return max(results, key=itemgetter(0))[1] 

 

 

def ordered_group(*args): 

"""Like :meth:`celery.group`, but preserve order. 

 

This is a temporary workaround for 

https://github.com/celery/celery/pull/4858. 

""" 

return group(arg | _pack.s(i) for i, arg in enumerate(args)) | _unpack.s() 

 

 

def ordered_group_first(*args): 

"""Like :meth:`celery.group`, but only return the first result.""" 

return group(arg | _pack.s(i) for i, arg in enumerate(args)) | _first.s() 

 

 

def ordered_group_last(*args): 

"""Like :meth:`celery.group`, but only return the last result.""" 

return group(arg | _pack.s(i) for i, arg in enumerate(args)) | _last.s() 

 

 

class DispatchHandler(dict): 

 

def process_args(self, *args, **kwargs): 

r"""Determine key and callback arguments. 

 

The default implementation treats the first positional argument as the 

key. 

 

Parameters 

---------- 

\*args 

Arguments passed to :meth:`__call__`. 

\*\*kwargs 

Keyword arguments passed to :meth:`__call__`. 

 

Returns 

------- 

key 

The key to determine which callback to invoke. 

\*args 

The arguments to pass to the registered callback. 

\*\*kwargs 

The keyword arguments to pass to the registered callback. 

 

""" 

key, *args = args 

return key, args, kwargs 

 

def __call__(self, *keys, **kwargs): 

r"""Create a new task and register it as a callback for handling the 

given keys. 

 

Parameters 

---------- 

\*keys : list 

Keys to match 

\*\*kwargs 

Additional keyword arguments for `celery.Celery.task`. 

 

""" 

def wrap(f): 

f = app.task(ignore_result=True, **kwargs)(f) 

for key in keys: 

self.setdefault(key, []).append(f) 

return f 

 

return wrap 

 

def dispatch(self, *args, **kwargs): 

log.debug('considering dispatch: args=%r, kwargs=%r', args, kwargs) 

try: 

key, args, kwargs = self.process_args(*args, **kwargs) 

except (TypeError, ValueError): 

log.exception('error unpacking key') 

return 

log.debug('unpacked: key=%r, args=%r, kwargs=%r', key, args, kwargs) 

 

try: 

matching_handlers = self[key] 

except KeyError: 

log.warning('ignoring unrecognized key: %r', key) 

else: 

log.info('calling handlers %r for key %r', matching_handlers, key) 

group([handler.s() for handler in matching_handlers]).apply_async( 

args, kwargs)