Coverage for gwcelery/tasks/inference.py: 99%
327 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-17 17:22 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-17 17:22 +0000
1"""Source Parameter Estimation with LALInference, Bilby, and RapidPE."""
2import glob
3import json
4import os
5import subprocess
6import urllib
7from distutils.dir_util import mkpath
8from distutils.spawn import find_executable
10import numpy as np
11from bilby_pipe.bilbyargparser import BilbyConfigFileParser
12from bilby_pipe.utils import convert_string_to_dict
13from celery import group
14from celery.exceptions import Ignore
16from .. import app
17from ..jinja import env
18from . import condor, gracedb
20_RAPIDPE_NO_GSTLAL_TRIGGER_EXIT_CODE = 100
22RAPIDPE_GETENV = [
23 "DEFAULT_SEGMENT_SERVER", "GWDATAFIND_SERVER",
24 "LAL_DATA_PATH", "LD_LIBRARY_PATH", "LIBRARY_PATH",
25 "NDSSERVER", "PATH", "PYTHONPATH",
26]
27"""Names of environment variables to include in RapidPE HTCondor submit
28files."""
30RAPIDPE_ENVIRONMENT = {
31 "RIFT_LOWLATENCY": "True",
32}
33"""Names and values of environment variables to include in RapidPE HTCondor
34submit files."""
37def _find_appropriate_cal_env(trigtime, dir_name):
38 """Return the path to the calibration uncertainties estimated at the time
39 before and closest to the trigger time. If there are no calibration
40 uncertainties estimated before the trigger time, return the oldest one. The
41 gpstimes at which the calibration uncertainties were estimated and the
42 names of the files containing the uncertaintes are saved in
43 [HLV]_CalEnvs.txt.
45 Parameters
46 ----------
47 trigtime : float
48 The trigger time of a target event
49 dir_name : str
50 The path to the directory where files containing calibration
51 uncertainties exist
53 Return
54 ------
55 path : str
56 The path to the calibration uncertainties appropriate for a target
57 event
59 """
60 filename, = glob.glob(os.path.join(dir_name, '[HLV]_CalEnvs.txt'))
61 calibration_index = np.atleast_1d(
62 np.recfromtxt(filename, names=['gpstime', 'filename'])
63 )
64 gpstimes = calibration_index['gpstime']
65 candidate_gpstimes = gpstimes < trigtime
66 if np.any(candidate_gpstimes):
67 idx = np.argmax(gpstimes * candidate_gpstimes)
68 appropriate_cal = calibration_index['filename'][idx]
69 else:
70 appropriate_cal = calibration_index['filename'][np.argmin(gpstimes)]
71 return os.path.join(dir_name, appropriate_cal.decode('utf-8'))
74def prepare_lalinference_ini(event, superevent_id):
75 """Determine LALInference configurations and return ini file content
77 Parameters
78 ----------
79 event : dict
80 The json contents of a target G event retrieved from
81 gracedb.get_event(), whose mass and spin information are used to
82 determine analysis settings.
83 superevent_id : str
84 The GraceDB ID of a target superevent
86 Returns
87 -------
88 ini_contents : str
90 """
91 # Get template of .ini file
92 ini_template = env.get_template('lalinference.jinja2')
94 # fill out the ini template and return the resultant content
95 singleinspiraltable = event['extra_attributes']['SingleInspiral']
96 trigtime = event['gpstime']
97 executables = {'datafind': 'gw_data_find',
98 'mergeNSscript': 'lalinference_nest2pos',
99 'mergeMCMCscript': 'cbcBayesMCMC2pos',
100 'combinePTMCMCh5script': 'cbcBayesCombinePTMCMCh5s',
101 'resultspage': 'cbcBayesPostProc',
102 'segfind': 'ligolw_segment_query',
103 'ligolw_print': 'ligolw_print',
104 'coherencetest': 'lalinference_coherence_test',
105 'lalinferencenest': 'lalinference_nest',
106 'lalinferencemcmc': 'lalinference_mcmc',
107 'lalinferencebambi': 'lalinference_bambi',
108 'lalinferencedatadump': 'lalinference_datadump',
109 'ligo-skymap-from-samples': 'true',
110 'ligo-skymap-plot': 'true',
111 'processareas': 'process_areas',
112 'computeroqweights': 'lalinference_compute_roq_weights',
113 'mpiwrapper': 'lalinference_mpi_wrapper',
114 'gracedb': 'gracedb',
115 'ppanalysis': 'cbcBayesPPAnalysis',
116 'pos_to_sim_inspiral': 'cbcBayesPosToSimInspiral',
117 'bayeswave': 'BayesWave',
118 'bayeswavepost': 'BayesWavePost'}
119 ini_settings = {
120 'gracedb_host': app.conf['gracedb_host'],
121 'types': app.conf['low_latency_frame_types'],
122 'channels': app.conf['strain_channel_names'],
123 'state_vector_channels': app.conf['state_vector_channel_names'],
124 'webdir': os.path.join(
125 app.conf['pe_results_path'], superevent_id, 'lalinference'
126 ),
127 'paths': [{'name': name, 'path': find_executable(executable)}
128 for name, executable in executables.items()],
129 'h1_calibration': _find_appropriate_cal_env(
130 trigtime,
131 '/home/cbc/pe/O3/calibrationenvelopes/LIGO_Hanford'
132 ),
133 'l1_calibration': _find_appropriate_cal_env(
134 trigtime,
135 '/home/cbc/pe/O3/calibrationenvelopes/LIGO_Livingston'
136 ),
137 'v1_calibration': _find_appropriate_cal_env(
138 trigtime,
139 '/home/cbc/pe/O3/calibrationenvelopes/Virgo'
140 ),
141 'mc': min([sngl['mchirp'] for sngl in singleinspiraltable]),
142 'q': min([sngl['mass2'] / sngl['mass1']
143 for sngl in singleinspiraltable]),
144 'mpirun': find_executable('mpirun')
145 }
146 return ini_template.render(ini_settings)
149@app.task(shared=False)
150def _setup_dag_for_lalinference(coinc, rundir, event, superevent_id):
151 """Create DAG for a lalinference run and return the path to DAG.
153 Parameters
154 ----------
155 coinc : byte contents
156 Byte contents of ``coinc.xml``. The PSD is expected to be embedded.
157 rundir : str
158 The path to a run directory where the DAG file is created.
159 event : dict
160 The json contents of a target G event retrieved from
161 gracedb.get_event(), whose mass and spin information are used to
162 determine analysis settings.
163 superevent_id : str
164 The GraceDB ID of a target superevent
166 Returns
167 -------
168 path_to_dag : str
169 The path to the .dag file
171 """
172 # write down coinc.xml in the run directory
173 path_to_coinc = os.path.join(rundir, 'coinc.xml')
174 with open(path_to_coinc, 'wb') as f:
175 f.write(coinc)
177 # write down and upload ini file
178 ini_contents = prepare_lalinference_ini(event, superevent_id)
179 path_to_ini = os.path.join(rundir, 'online_lalinference_pe.ini')
180 with open(path_to_ini, 'w') as f:
181 f.write(ini_contents)
182 gracedb.upload.delay(
183 ini_contents, filename=os.path.basename(path_to_ini),
184 graceid=superevent_id,
185 message=('Automatically generated LALInference configuration file'
186 ' for this event.'),
187 tags='pe')
189 try:
190 subprocess.run(
191 ['lalinference_pipe', '--run-path', rundir,
192 '--coinc', path_to_coinc, path_to_ini, '--psd', path_to_coinc],
193 capture_output=True, check=True)
194 except subprocess.CalledProcessError as e:
195 contents = b'args:\n' + json.dumps(e.args[1]).encode('utf-8') + \
196 b'\n\nstdout:\n' + e.stdout + b'\n\nstderr:\n' + e.stderr
197 gracedb.upload.delay(
198 filecontents=contents, filename='lalinference_dag.log',
199 graceid=superevent_id,
200 message='Failed to prepare DAG for lalinference', tags='pe'
201 )
202 raise
204 return os.path.join(rundir, 'multidag.dag')
207@app.task(shared=False)
208def _setup_dag_for_bilby(
209 coinc_bayestar, rundir, event, superevent_id, mode="production"
210):
211 """Create DAG for a bilby run and return the path to DAG.
213 Parameters
214 ----------
215 coinc_bayestar : tuple
216 Byte contents of ``coinc.xml`` and ``bayestar.multiorder.fits``.
217 rundir : str
218 The path to a run directory where the DAG file is created
219 event : dict
220 The json contents of a target G event retrieved from
221 gracedb.get_event(), whose mass and spin information are used to
222 determine analysis settings.
223 superevent_id : str
224 The GraceDB ID of a target superevent
225 mode : str
226 Analysis mode, allowed options are "production" and "fast_test",
227 default is "production".
229 Returns
230 -------
231 path_to_dag : str
232 The path to the .dag file
234 Notes
235 -----
236 `--channel-dict o3replay` is added to bilby_pipe_gracedb arguments when the
237 gracedb host is different from `gracedb.ligo.org` or
238 `gracedb-test.ligo.org`. Condor queue is set to `Online_PE` if gracedb host
239 is `gracedb.ligo.org`, and `Online_PE_MDC` otherwise.
241 """
242 path_to_json = os.path.join(rundir, 'event.json')
243 with open(path_to_json, 'w') as f:
244 json.dump(event, f, indent=2)
246 coinc, bayestar = coinc_bayestar
247 path_to_psd = os.path.join(rundir, 'coinc.xml')
248 with open(path_to_psd, 'wb') as f:
249 f.write(coinc)
250 path_to_bayestar = os.path.join(rundir, 'bayestar.multiorder.fits')
251 with open(path_to_bayestar, 'wb') as f:
252 f.write(bayestar)
254 path_to_webdir = os.path.join(
255 app.conf['pe_results_path'], superevent_id, 'bilby', mode
256 )
258 path_to_settings = os.path.join(rundir, 'settings.json')
259 setup_arg = ['bilby_pipe_gracedb', '--webdir', path_to_webdir,
260 '--outdir', rundir, '--json', path_to_json,
261 '--psd-file', path_to_psd, '--skymap-file', path_to_bayestar,
262 '--settings', path_to_settings]
263 settings = {'summarypages_arguments': {'gracedb': event['graceid'],
264 'no_ligo_skymap': True},
265 'accounting_user': 'soichiro.morisaki'}
266 if app.conf['gracedb_host'] != 'gracedb.ligo.org':
267 settings['queue'] = 'Online_PE_MDC'
268 else:
269 settings['queue'] = 'Online_PE'
270 settings['accounting'] = 'ligo.prod.o4.cbc.pe.bilby'
271 # FIXME: using live data for gracedb-test events should be reconsidered
272 # when we have a better idea to differentiate MDC and real events.
273 if app.conf['gracedb_host'] not in [
274 'gracedb.ligo.org', 'gracedb-test.ligo.org'
275 ]:
276 setup_arg += ['--channel-dict', 'o3replay']
278 trigger_chirp_mass = event['extra_attributes']['CoincInspiral']['mchirp']
279 if trigger_chirp_mass < 0.6:
280 raise ValueError(
281 "No bilby settings available for trigger chirp mass of"
282 f" {trigger_chirp_mass}Msun."
283 )
284 if mode == 'production':
285 settings.update(
286 {
287 'sampler_kwargs': {'naccept': 60, 'nlive': 500,
288 'npool': 24, 'sample': 'acceptance-walk'},
289 'n_parallel': 3,
290 'request_cpus': 24,
291 'spline_calibration_nodes': 10,
292 'request_memory_generation': 8.0
293 }
294 )
295 # use low-spin IMRPhenomD below chirp mass of m1=3Msun, m2=1Msun
296 # assuming binary neutron star
297 if trigger_chirp_mass < 1.465:
298 likelihood_mode = 'lowspin_phenomd_fhigh1024_roq'
299 settings['sampler_kwargs']['naccept'] = 10
300 # use IMRPhenomPv2 with mass ratio upper bound of 8 below chirp mass of
301 # m1=8Msun, m2=1Msun
302 elif trigger_chirp_mass < 2.243:
303 likelihood_mode = 'phenompv2_bns_roq'
304 # use IMRPhenomPv2 with mass ratio upper bound of 20 in chirp-mass
305 # range where IMRPhenomXPHM ROQ bases are not available
306 elif trigger_chirp_mass < 12:
307 likelihood_mode = 'low_q_phenompv2_roq'
308 else:
309 likelihood_mode = 'phenomxphm_roq'
310 settings['request_memory_generation'] = 36.0
311 settings['request_memory'] = 16.0
312 setup_arg += ['--cbc-likelihood-mode', likelihood_mode]
313 elif mode == 'fast_test':
314 setup_arg += ["--sampler-kwargs", "FastTest"]
315 if trigger_chirp_mass < 3.9:
316 setup_arg += ['--cbc-likelihood-mode', 'phenompv2_bns_roq']
317 settings['request_memory_generation'] = 8.0
318 else:
319 raise ValueError(f"mode: {mode} not recognized.")
321 with open(path_to_settings, 'w') as f:
322 json.dump(settings, f, indent=2)
324 try:
325 subprocess.run(setup_arg, capture_output=True, check=True)
326 except subprocess.CalledProcessError as e:
327 contents = b'args:\n' + json.dumps(e.args[1]).encode('utf-8') + \
328 b'\n\nstdout:\n' + e.stdout + b'\n\nstderr:\n' + e.stderr
329 gracedb.upload.delay(
330 filecontents=contents, filename='bilby_dag.log',
331 graceid=superevent_id,
332 message=f'Failed to prepare DAG for {mode}-mode bilby', tags='pe'
333 )
334 raise
335 else:
336 # Uploads bilby ini file to GraceDB
337 with open(os.path.join(rundir, 'bilby_config.ini'), 'r') as f:
338 ini_contents = f.read()
339 if mode == 'production':
340 filename = 'bilby_config.ini'
341 else:
342 filename = f'bilby_{mode}_config.ini'
343 gracedb.upload.delay(
344 ini_contents, filename=filename, graceid=superevent_id,
345 message=(f'Automatically generated {mode}-mode Bilby configuration'
346 ' file for this event.'),
347 tags='pe')
349 path_to_dag, = glob.glob(os.path.join(rundir, 'submit/dag*.submit'))
350 return path_to_dag
353@app.task(shared=False)
354def _setup_dag_for_rapidpe(rundir, superevent_id, event):
355 """Create DAG for a rapidpe run and return the path to DAG.
357 Parameters
358 ----------
359 rundir : str
360 The path to a run directory where the DAG file is created
361 superevent_id : str
362 The GraceDB ID of a target superevent
364 Returns
365 -------
366 path_to_dag : str
367 The path to the .dag file
369 """
370 gracedb_host = app.conf['gracedb_host']
372 settings = app.conf['rapidpe_settings']
373 trigger_snr = event['extra_attributes']['CoincInspiral']['snr']
374 high_snr_trigger = trigger_snr >= 37.5
376 # dump ini file
377 ini_template = env.get_template('rapidpe.jinja2')
378 ini_contents = ini_template.render(
379 {'rundir': rundir,
380 'webdir': os.path.join(
381 app.conf['pe_results_path'], superevent_id, 'rapidpe'
382 ),
383 'gracedb_url': f'https://{gracedb_host}/api',
384 'superevent_id': superevent_id,
385 'run_mode': settings['run_mode'],
386 'frame_data_types': app.conf['low_latency_frame_types'],
387 'accounting_group': settings['accounting_group'],
388 'use_cprofile': settings['use_cprofile'],
389 'gracedb_host': gracedb_host,
390 'high_snr_trigger': high_snr_trigger,
391 'getenv': RAPIDPE_GETENV,
392 'environment': RAPIDPE_ENVIRONMENT})
393 path_to_ini = os.path.join(rundir, 'rapidpe.ini')
394 with open(path_to_ini, 'w') as f:
395 f.write(ini_contents)
396 gracedb.upload.delay(
397 ini_contents, filename=os.path.basename(path_to_ini),
398 graceid=superevent_id,
399 message=('Automatically generated RapidPE-RIFT configuration file'
400 ' for this event.'),
401 tags='pe')
403 # set up dag
404 try:
405 subprocess.run(['rapidpe-rift-pipe', path_to_ini],
406 capture_output=True, check=True)
407 except subprocess.CalledProcessError as e:
408 contents = b'args:\n' + json.dumps(e.args[1]).encode('utf-8') + \
409 b'\n\nstdout:\n' + e.stdout + b'\n\nstderr:\n' + e.stderr
411 message = 'Failed to prepare DAG for Rapid PE'
413 fail_gracefully = False
414 if e.returncode == _RAPIDPE_NO_GSTLAL_TRIGGER_EXIT_CODE:
415 fail_gracefully = True
416 message += ": no GstLAL trigger available"
418 gracedb.upload.delay(
419 filecontents=contents, filename='rapidpe_dag.log',
420 graceid=superevent_id,
421 message=message, tags='pe'
422 )
424 if fail_gracefully:
425 # Ends task but without logging as a failure
426 raise Ignore()
427 else:
428 # Ends task with the unhandled error logged
429 raise
431 # return path to dag
432 dag = os.path.join(rundir, "event_all_iterations.dag")
433 return dag
436@app.task(shared=False)
437def _condor_no_submit(path_to_dag, include_env=None):
438 """Run 'condor_submit_dag -no_submit' and return the path to .sub file."""
439 args = ['condor_submit_dag']
441 if include_env is not None:
442 args += ['-include_env', ','.join(include_env)]
444 args += ['-no_submit', path_to_dag]
445 subprocess.run(args, capture_output=True, check=True)
446 return '{}.condor.sub'.format(path_to_dag)
449def dag_prepare_task(rundir, event, superevent_id, pe_pipeline, **kwargs):
450 """Return a canvas of tasks to prepare DAG.
452 Parameters
453 ----------
454 rundir : str
455 The path to a run directory where the DAG file is created
456 event : dict
457 The json contents of a target G event retrieved from
458 gracedb.get_event(), whose mass and spin information are used to
459 determine analysis settings.
460 superevent_id : str
461 The GraceDB ID of a target superevent
462 pe_pipeline : str
463 The parameter estimation pipeline used,
464 lalinference, bilby, or rapidpe.
466 Returns
467 -------
468 canvas : canvas of tasks
469 The canvas of tasks to prepare DAG
471 """
472 # List of environment variables `condor_submit_dag` should be aware of.
473 include_env = None
475 if pe_pipeline == 'lalinference':
476 canvas = gracedb.download.si('coinc.xml', event['graceid']) | \
477 _setup_dag_for_lalinference.s(rundir, event, superevent_id)
478 elif pe_pipeline == 'bilby':
479 canvas = group(
480 gracedb.download.si('coinc.xml', event['graceid']),
481 gracedb.download.si('bayestar.multiorder.fits', event['graceid'])
482 ) | _setup_dag_for_bilby.s(
483 rundir, event, superevent_id, kwargs['bilby_mode']
484 )
485 elif pe_pipeline == 'rapidpe':
486 canvas = _setup_dag_for_rapidpe.s(rundir, superevent_id, event)
487 include_env = RAPIDPE_GETENV
488 else:
489 raise NotImplementedError(f'Unknown PE pipeline {pe_pipeline}.')
491 canvas |= _condor_no_submit.s(include_env=include_env)
493 return canvas
496def _find_paths_from_name(directory, name):
497 """Return the paths of files or directories with given name under the
498 specfied directory
500 Parameters
501 ----------
502 directory : string
503 Name of directory under which the target file or directory is searched
504 for.
505 name : string
506 Name of target files or directories
508 Returns
509 -------
510 paths : generator
511 Paths to the target files or directories
513 """
514 return glob.iglob(os.path.join(directory, '**', name), recursive=True)
517@app.task(ignore_result=True, shared=False)
518def _clean_up_bilby(rundir):
519 """Remove large data products produced by bilby
521 Parameters
522 ----------
523 rundir : str
525 """
526 for p in glob.glob(
527 os.path.join(rundir, "data/*_generation_roq_weights.hdf5")
528 ):
529 os.remove(p)
530 for p in glob.glob(
531 os.path.join(rundir, "data/*_generation_data_dump.pickle")
532 ):
533 os.remove(p)
536@app.task(ignore_result=True, shared=False)
537def job_error_notification(request, exc, traceback,
538 superevent_id, rundir, analysis):
539 """Upload notification when condor.submit terminates unexpectedly.
541 Parameters
542 ----------
543 request : Context (placeholder)
544 Task request variables
545 exc : Exception
546 Exception raised by condor.submit
547 traceback : str (placeholder)
548 Traceback message from a task
549 superevent_id : str
550 The GraceDB ID of a target superevent
551 rundir : str
552 The run directory for PE
553 analysis : str
554 Analysis name used as a label in uploaded messages
556 Notes
557 -----
558 Some large bilby data products are cleaned up after the notification if the
559 gracedb host is different from `gracedb.ligo.org`.
561 """
562 if isinstance(exc, condor.JobRunning):
563 subprocess.run(['condor_rm', str(exc.args[0]['Cluster'])])
564 canvas = gracedb.upload.si(
565 filecontents=None, filename=None, graceid=superevent_id, tags='pe',
566 message=f'The {analysis} condor job was aborted by gwcelery, '
567 'due to its long run time.'
568 )
569 elif isinstance(exc, condor.JobAborted):
570 canvas = gracedb.upload.si(
571 filecontents=None, filename=None, graceid=superevent_id, tags='pe',
572 message=f'The {analysis} condor job was aborted.'
573 )
574 else:
575 canvas = gracedb.upload.si(
576 filecontents=None, filename=None, graceid=superevent_id, tags='pe',
577 message=f'The {analysis} condor job failed.'
578 )
580 if analysis == "rapidpe":
581 to_upload = [
582 'event_all_iterations.dag.lib.err',
583 'marginalize_extrinsic_parameters_iteration_*.dag.lib.err'
584 ]
585 else:
586 to_upload = ['*.log', '*.err', '*.out']
587 for filename in to_upload:
588 tasks = []
589 for path in _find_paths_from_name(rundir, filename):
590 with open(path, 'rb') as f:
591 contents = f.read()
592 if contents:
593 # put .log suffix in log file names so that users can directly
594 # read the contents instead of downloading them when they click
595 # file names
596 tasks.append(gracedb.upload.si(
597 filecontents=contents,
598 filename=os.path.basename(path) + '.log',
599 graceid=superevent_id,
600 message=f'A log file for {analysis} condor job.',
601 tags='pe'
602 ))
603 canvas |= group(tasks)
605 if "bilby" in analysis and app.conf['gracedb_host'] != 'gracedb.ligo.org':
606 canvas |= _clean_up_bilby.si(rundir)
608 canvas.delay()
611def _upload_tasks_lalinference(rundir, superevent_id):
612 """Return canvas of tasks to upload LALInference results
614 Parameters
615 ----------
616 rundir : str
617 The path to a run directory
618 superevent_id : str
619 The GraceDB ID of a target superevent
621 Returns
622 -------
623 tasks : canvas
624 The work-flow for uploading LALInference results
626 """
627 pe_results_path = os.path.join(
628 app.conf['pe_results_path'], superevent_id, 'lalinference'
629 )
631 # posterior samples
632 path, = glob.glob(
633 os.path.join(rundir, '**', 'posterior*.hdf5'), recursive=True)
634 with open(path, 'rb') as f:
635 canvas = gracedb.upload.si(
636 f.read(), 'LALInference.posterior_samples.hdf5',
637 superevent_id, 'LALInference posterior samples', 'pe')
639 # plots
640 tasks = []
641 for filename, message in [
642 ('extrinsic.png', 'LALInference corner plot for extrinsic parameters'),
643 ('intrinsic.png', 'LALInference corner plot for intrinsic parameters')
644 ]:
645 # Here it is not required that only a single png file exists, so that
646 # posterior samples are uploaded whatever. This applies for the other
647 # files.
648 for path in _find_paths_from_name(pe_results_path, filename):
649 with open(path, 'rb') as f:
650 tasks.append(gracedb.upload.si(
651 f.read(), f'LALInference.{filename}', superevent_id,
652 message, 'pe'
653 ))
654 canvas |= group(tasks)
656 # psd
657 tasks = []
658 for path in _find_paths_from_name(rundir, 'glitch_median_PSD_forLI_*.dat'):
659 with open(path, 'r') as f:
660 tasks.append(gracedb.upload.si(
661 f.read(), os.path.basename(path), superevent_id,
662 'Bayeswave PSD used for LALInference PE', 'pe'
663 ))
664 canvas |= group(tasks)
666 # dag
667 tasks = []
668 for path in _find_paths_from_name(rundir, 'lalinference*.dag'):
669 with open(path, 'r') as f:
670 tasks.append(gracedb.upload.si(
671 f.read(), os.path.basename(path), superevent_id,
672 'LALInference DAG', 'pe'
673 ))
674 canvas |= group(tasks)
676 # link to results page
677 tasks = []
678 for path in _find_paths_from_name(pe_results_path, 'posplots.html'):
679 baseurl = urllib.parse.urljoin(
680 app.conf['pe_results_url'],
681 os.path.relpath(path, app.conf['pe_results_path'])
682 )
683 tasks.append(gracedb.upload.si(
684 None, None, superevent_id,
685 'Online lalinference parameter estimation finished. '
686 f'<a href={baseurl}>results</a>'
687 ))
688 canvas |= group(tasks)
690 return canvas
693def _upload_tasks_bilby(rundir, superevent_id, mode):
694 """Return canvas of tasks to upload Bilby results
696 Parameters
697 ----------
698 rundir : str
699 The path to a run directory
700 superevent_id : str
701 The GraceDB ID of a target superevent
702 mode : str
703 Analysis mode
705 Returns
706 -------
707 tasks : canvas
708 The work-flow for uploading Bilby results
710 Notes
711 -----
712 Some large bilby data products are cleaned up after posteterior file is
713 uploaded if the gracedb host is different from `gracedb.ligo.org`.
715 """
716 # convert bilby sample file into one compatible with ligo-skymap
717 samples_dir = os.path.join(rundir, 'final_result')
718 if mode == 'production':
719 samples_filename = 'Bilby.posterior_samples.hdf5'
720 else:
721 samples_filename = f'Bilby.{mode}.posterior_samples.hdf5'
722 out_samples = os.path.join(samples_dir, samples_filename)
723 in_samples, = glob.glob(os.path.join(samples_dir, '*result.hdf5'))
724 subprocess.run(
725 ['bilby_pipe_to_ligo_skymap_samples', in_samples, '--out', out_samples]
726 )
728 with open(out_samples, 'rb') as f:
729 canvas = gracedb.upload.si(
730 f.read(), samples_filename,
731 superevent_id, f'{mode}-mode Bilby posterior samples', 'pe')
733 if app.conf['gracedb_host'] != 'gracedb.ligo.org':
734 canvas |= _clean_up_bilby.si(rundir)
736 # pesummary
737 pesummary_kwargs = {}
738 path_to_ini, = glob.glob(os.path.join(rundir, "*_complete.ini"))
739 pesummary_kwargs["config"] = path_to_ini
740 config_parser = BilbyConfigFileParser()
741 with open(path_to_ini, "r") as f:
742 config_content, _, _, _ = config_parser.parse(f)
743 pesummary_kwargs["psd"] = convert_string_to_dict(
744 config_content["psd-dict"]
745 )
746 pesummary_kwargs["calibration"] = convert_string_to_dict(
747 config_content["spline-calibration-envelope-dict"]
748 )
749 pesummary_kwargs["approximant"] = config_content["waveform-approximant"]
750 pesummary_kwargs["f_low"] = config_content["minimum-frequency"]
751 pesummary_kwargs["f_ref"] = config_content["reference-frequency"]
752 pesummary_kwargs["label"] = "online"
754 webdir = os.path.join(config_content["webdir"], 'pesummary')
755 url = urllib.parse.urljoin(
756 app.conf['pe_results_url'],
757 os.path.relpath(
758 os.path.join(webdir, 'home.html'),
759 app.conf['pe_results_path']
760 )
761 )
762 canvas = group(
763 canvas,
764 _pesummary_task(webdir, in_samples, **pesummary_kwargs)
765 |
766 gracedb.upload.si(
767 None, None, superevent_id,
768 f'PESummary page for {mode}-mode Bilby is available '
769 f'<a href={url}>here</a>',
770 'pe'
771 )
772 )
774 return canvas
777def _upload_tasks_rapidpe(rundir, superevent_id):
778 summary_path = os.path.join(rundir, "summary")
780 url = urllib.parse.urljoin(
781 app.conf['pe_results_url'],
782 os.path.join(superevent_id, 'rapidpe', 'summarypage.html')
783 )
784 canvas = gracedb.upload.si(
785 None, None, superevent_id,
786 f'Summary page for RapidPE-RIFT is available <a href={url}>here</a>',
787 ('pe',))
788 to_upload = [
789 (
790 "p_astro.json", "RapidPE_RIFT.p_astro.json",
791 "RapidPE-RIFT Pastro results",
792 ("pe", "p_astro", "public"),
793 ),
794 ]
795 tasks = []
796 for src_basename, dst_filename, description, tags in to_upload:
797 src_filename = os.path.join(summary_path, src_basename)
798 if os.path.isfile(src_filename):
799 with open(src_filename, "rb") as f:
800 tasks.append(
801 gracedb.upload.si(
802 f.read(), dst_filename,
803 superevent_id, description, tags))
805 canvas |= group(tasks)
807 return canvas
810@app.task(ignore_result=True, shared=False)
811def dag_finished(rundir, superevent_id, pe_pipeline, **kwargs):
812 """Upload PE results
814 Parameters
815 ----------
816 rundir : str
817 The path to a run directory where the DAG file exits
818 superevent_id : str
819 The GraceDB ID of a target superevent
820 pe_pipeline : str
821 The parameter estimation pipeline used,
822 lalinference, bilby, or rapidpe.
824 """
825 if pe_pipeline == 'lalinference':
826 canvas = _upload_tasks_lalinference(rundir, superevent_id)
827 elif pe_pipeline == 'bilby':
828 canvas = _upload_tasks_bilby(
829 rundir, superevent_id, kwargs['bilby_mode'])
830 elif pe_pipeline == 'rapidpe':
831 canvas = _upload_tasks_rapidpe(rundir, superevent_id)
832 else:
833 raise NotImplementedError(f'Unknown PE pipeline {pe_pipeline}.')
835 canvas.delay()
837 # NOTE: check if this should include rapidpe as well
838 if pe_pipeline == 'bilby':
839 gracedb.create_label.delay('PE_READY', superevent_id)
842def _pesummary_task(webdir, samples, **pesummary_kwargs):
843 """Return a celery task to submit a pesummary condor job.
845 Parameters
846 ----------
847 webdir : str
848 output directory
849 samples : str
850 path to posterior sample file
851 **pesummary_kwargs
852 Extra arguments of summarypages
854 Returns
855 -------
856 celery task
858 Notes
859 -----
860 `--disable_interactive --disable_expert` are added and `--redshift_method
861 exact --evolve_spins_forwards` are not added to `summarypages` arguments
862 when the gracedb host is different from `gracedb.ligo.org`. Condor queue is
863 set to `Online_PE` if gracedb host is `gracedb.ligo.org`, and
864 `Online_PE_MDC` otherwise.
866 """
867 args = [
868 "summarypages", "--webdir", webdir, "--samples", samples, "--gw",
869 "--no_ligo_skymap", "--multi_process", "6"
870 ]
871 for key in pesummary_kwargs:
872 if key in ["psd", "calibration"]:
873 args += [f"--{key}"]
874 for ifo in pesummary_kwargs[key]:
875 args += [f'{ifo}:{pesummary_kwargs[key][ifo]}']
876 else:
877 args += [f"--{key}", pesummary_kwargs[key]]
878 condor_kwargs = dict(
879 request_memory=16000, request_disk=5000, request_cpus=6,
880 accounting_group_user='soichiro.morisaki'
881 )
882 if app.conf['gracedb_host'] != 'gracedb.ligo.org':
883 condor_kwargs['accounting_group'] = 'ligo.dev.o4.cbc.pe.bilby'
884 condor_kwargs['requirements'] = '((TARGET.Online_PE_MDC =?= True))'
885 condor_kwargs['+Online_PE_MDC'] = True
886 args += ["--disable_interactive", "--disable_expert"]
887 else:
888 condor_kwargs['accounting_group'] = 'ligo.prod.o4.cbc.pe.bilby'
889 condor_kwargs['requirements'] = '((TARGET.Online_PE =?= True))'
890 condor_kwargs['+Online_PE'] = True
891 args += ["--redshift_method", "exact", "--evolve_spins_forwards"]
892 return condor.check_output.si(args, **condor_kwargs)
895# Modified version of condor.submit task with retry kwargs overridden with
896# RapidPE-specific settings.
897submit_rapidpe = app.task(
898 **condor.submit_kwargs,
899 **app.conf['rapidpe_condor_retry_kwargs'],
900)(condor.submit.run)
903@app.task(ignore_result=True, shared=False)
904def start_pe(event, superevent_id, pe_pipeline):
905 """Run Parameter Estimation on a given event.
907 Parameters
908 ----------
909 event : dict
910 The json contents of a target G event retrieved from
911 gracedb.get_event(), whose mass and spin information are used to
912 determine analysis settings.
913 superevent_id : str
914 The GraceDB ID of a target superevent
915 pe_pipeline : str
916 The parameter estimation pipeline used,
917 lalinference, bilby, or rapidpe.
919 """
920 # make an event directory
921 pipeline_dir = os.path.expanduser('~/.cache/{}'.format(pe_pipeline))
922 mkpath(pipeline_dir)
923 event_dir = os.path.join(pipeline_dir, superevent_id)
925 if pe_pipeline == 'bilby':
926 if (
927 app.conf['gracedb_host'] == 'gracedb-playground.ligo.org' and
928 event['extra_attributes']['CoincInspiral']['mchirp'] >= 12
929 ):
930 # Count the number of BBH jobs and do not start a run if it exceeds
931 # 5 so that we do not use up disk space. We assume that the job is
932 # running if a data dump pickle file exists under the run
933 # directory, which is the largest file produced by PE and removed
934 # when the run completes.
935 number_of_bbh_running = 0
936 for p in glob.glob(
937 os.path.join(
938 pipeline_dir,
939 "*/*/data/*_generation_data_dump.pickle"
940 )
941 ):
942 path_to_ev = os.path.join(os.path.dirname(p), "../event.json")
943 if os.path.exists(path_to_ev):
944 with open(path_to_ev, "r") as f:
945 ev = json.load(f)
946 mc = ev['extra_attributes']['CoincInspiral']['mchirp']
947 if mc >= 12:
948 number_of_bbh_running += 1
949 if number_of_bbh_running > 5:
950 gracedb.upload.delay(
951 filecontents=None, filename=None, graceid=superevent_id,
952 message='Parameter estimation will not start to save disk '
953 f'space (There are {number_of_bbh_running} BBH '
954 'jobs running).',
955 tags='pe'
956 )
957 return
958 modes = ["production"]
959 rundirs = [os.path.join(event_dir, m) for m in modes]
960 kwargs_list = [{'bilby_mode': m} for m in modes]
961 analyses = [f'{m}-mode bilby' for m in modes]
962 condor_submit_task = condor.submit
963 elif pe_pipeline == 'rapidpe':
964 rundirs = [event_dir]
965 kwargs_list = [{'event_pipeline': event["pipeline"]}]
966 analyses = [pe_pipeline]
967 condor_submit_task = submit_rapidpe
968 else:
969 rundirs = [event_dir]
970 kwargs_list = [{}]
971 analyses = [pe_pipeline]
972 condor_submit_task = condor.submit
974 os.mkdir(event_dir)
975 for rundir, kwargs, analysis in zip(rundirs, kwargs_list, analyses):
976 mkpath(rundir)
978 gracedb.upload.delay(
979 filecontents=None, filename=None, graceid=superevent_id,
980 message=(f'Starting {analysis} parameter estimation '
981 f'for {event["graceid"]}'),
982 tags='pe'
983 )
985 (
986 dag_prepare_task(
987 rundir, event, superevent_id, pe_pipeline, **kwargs
988 )
989 |
990 condor_submit_task.s().on_error(
991 job_error_notification.s(superevent_id, rundir, analysis)
992 )
993 |
994 dag_finished.si(rundir, superevent_id, pe_pipeline, **kwargs)
995 ).delay()