Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

"""Shortcuts for HTCondor commands to manage deployment of GWCelery on LIGO 

Data Grid clusters. 

 

These commands apply to the GWCelery instance that is 

running in the current working directory.""" 

 

import json 

import os 

import shlex 

import subprocess 

import sys 

import time 

 

from celery.bin.base import Command 

import lxml.etree 

import pkg_resources 

 

SUBMIT_FILE = pkg_resources.resource_filename(__name__, '../data/gwcelery.sub') 

 

 

def get_constraints(): 

return '-constraint', 'JobBatchName=={} && Iwd=={}'.format( 

json.dumps('gwcelery'), # JSON string literal escape sequences 

json.dumps(os.getcwd()) # are a close match to HTCondor ClassAds. 

) 

 

 

def run_exec(*args): 

print(' '.join(shlex.quote(arg) for arg in args)) 

os.execvp(args[0], args) 

 

 

def running(): 

"""Determine if GWCelery is already running under HTCondor.""" 

status = subprocess.check_output(('condor_q', '-xml', *get_constraints())) 

classads = lxml.etree.fromstring(status) 

return classads.find('.//c') is not None 

 

 

def submit(): 

"""Submit all GWCelery jobs to HTCondor (if not already running).""" 

if running(): 

print('error: GWCelery jobs are already running in this directory.\n' 

'You must first remove exist jobs with "gwcelery condor rm".\n' 

'To see the status of those jobs, run "gwcelery condor q".', 

file=sys.stderr) 

sys.exit(1) 

else: 

run_exec('condor_submit', SUBMIT_FILE) 

 

 

def resubmit(): 

"""Remove any running GWCelery jobs and resubmit to HTCondor.""" 

if running(): 

subprocess.check_call(('condor_rm', *get_constraints())) 

timeout = 60 

start = time.monotonic() 

while time.monotonic() - start < timeout: 

if not running(): 

break 

time.sleep(1) 

else: 

print('error: Could not stop all GWCelery jobs', file=sys.stderr) 

sys.exit(1) 

run_exec('condor_submit', SUBMIT_FILE) 

 

 

def rm(): 

"""Remove all GWCelery jobs.""" 

run_exec('condor_rm', *get_constraints()) 

 

 

def hold(): 

"""Put all GWCelery jobs on hold.""" 

run_exec('condor_hold', *get_constraints()) 

 

 

def release(): 

"""Release all GWCelery jobs from hold status.""" 

run_exec('condor_release', *get_constraints()) 

 

 

def q(): 

"""Show status of all GWCelery jobs.""" 

run_exec('condor_q', '-nobatch', *get_constraints()) 

 

 

class CondorCommand(Command): 

 

def add_arguments(self, parser): 

subparsers = parser.add_subparsers() 

for func in [submit, rm, hold, release, resubmit, q]: 

subparser = subparsers.add_parser(func.__name__, help=func.__doc__) 

subparser.set_defaults(func=func) 

 

def run(self, func=None, **kwargs): 

func() 

 

 

CondorCommand.__doc__ = __doc__