Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1"""Shortcuts for HTCondor commands to manage deployment of GWCelery on LIGO 

2Data Grid clusters. 

3 

4These commands apply to the GWCelery instance that is 

5running in the current working directory. 

6""" 

7import json 

8import os 

9import shlex 

10import subprocess 

11import sys 

12import time 

13 

14from celery.bin.base import Command 

15import lxml.etree 

16import pkg_resources 

17 

18SUBMIT_FILE = pkg_resources.resource_filename(__name__, '../data/gwcelery.sub') 

19 

20 

21def get_constraints(): 

22 return '-constraint', 'JobBatchName=={} && Iwd=={}'.format( 

23 json.dumps('gwcelery'), # JSON string literal escape sequences 

24 json.dumps(os.getcwd()) # are a close match to HTCondor ClassAds. 

25 ) 

26 

27 

28def run_exec(*args): 

29 print(' '.join(shlex.quote(arg) for arg in args)) 

30 os.execvp(args[0], args) 

31 

32 

33def running(): 

34 """Determine if GWCelery is already running under HTCondor.""" 

35 status = subprocess.check_output(('condor_q', '-xml', *get_constraints())) 

36 classads = lxml.etree.fromstring(status) 

37 return classads.find('.//c') is not None 

38 

39 

40def submit(app): 

41 """Submit all GWCelery jobs to HTCondor (if not already running).""" 

42 if running(): 

43 print('error: GWCelery jobs are already running in this directory.\n' 

44 'First remove existing jobs with "gwcelery condor rm".\n' 

45 'To see the status of those jobs, run "gwcelery condor q".', 

46 file=sys.stderr) 

47 sys.exit(1) 

48 else: 

49 accounting_group = app.conf['condor_accounting_group'] 

50 run_exec('condor_submit', 

51 'accounting_group={}'.format(accounting_group), 

52 SUBMIT_FILE) 

53 

54 

55def resubmit(app): 

56 """Remove any running GWCelery jobs and resubmit to HTCondor.""" 

57 if running(): 

58 subprocess.check_call(('condor_rm', *get_constraints())) 

59 timeout = 60 

60 start = time.monotonic() 

61 while time.monotonic() - start < timeout: 

62 if not running(): 

63 break 

64 time.sleep(1) 

65 else: 

66 print('error: Could not stop all GWCelery jobs', file=sys.stderr) 

67 sys.exit(1) 

68 accounting_group = app.conf['condor_accounting_group'] 

69 run_exec('condor_submit', 'accounting_group={}'.format(accounting_group), 

70 SUBMIT_FILE) 

71 

72 

73def rm(app): 

74 """Remove all GWCelery jobs.""" 

75 run_exec('condor_rm', *get_constraints()) 

76 

77 

78def hold(app): 

79 """Put all GWCelery jobs on hold.""" 

80 run_exec('condor_hold', *get_constraints()) 

81 

82 

83def release(app): 

84 """Release all GWCelery jobs from hold status.""" 

85 run_exec('condor_release', *get_constraints()) 

86 

87 

88def q(app): 

89 """Show status of all GWCelery jobs.""" 

90 run_exec('condor_q', '-nobatch', *get_constraints()) 

91 

92 

93class CondorCommand(Command): 

94 

95 def add_arguments(self, parser): 

96 subparsers = parser.add_subparsers() 

97 for func in [submit, rm, hold, release, resubmit, q]: 

98 subparser = subparsers.add_parser(func.__name__, help=func.__doc__) 

99 subparser.set_defaults(func=func) 

100 

101 def run(self, func=None, **kwargs): 

102 func(self.app) 

103 

104 

105CondorCommand.__doc__ = __doc__