Coverage for gwcelery/tasks/inference.py : 100%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Source Parameter Estimation with LALInference and Bilby."""
2from distutils.spawn import find_executable
3from distutils.dir_util import mkpath
4import glob
5import itertools
6import json
7import os
8import shutil
9import subprocess
10import tempfile
11import urllib
13from celery import group
14from gwdatafind import find_urls
15import numpy as np
16from requests.exceptions import HTTPError
18from .. import app
19from ..jinja import env
20from .core import ordered_group
21from . import condor
22from . import gracedb
25ini_name = 'online_lalinference_pe.ini'
27executables = {'datafind': 'gw_data_find',
28 'mergeNSscript': 'lalinference_nest2pos',
29 'mergeMCMCscript': 'cbcBayesMCMC2pos',
30 'combinePTMCMCh5script': 'cbcBayesCombinePTMCMCh5s',
31 'resultspage': 'cbcBayesPostProc',
32 'segfind': 'ligolw_segment_query',
33 'ligolw_print': 'ligolw_print',
34 'coherencetest': 'lalinference_coherence_test',
35 'lalinferencenest': 'lalinference_nest',
36 'lalinferencemcmc': 'lalinference_mcmc',
37 'lalinferencebambi': 'lalinference_bambi',
38 'lalinferencedatadump': 'lalinference_datadump',
39 'ligo-skymap-from-samples': 'true',
40 'ligo-skymap-plot': 'true',
41 'processareas': 'process_areas',
42 'computeroqweights': 'lalinference_compute_roq_weights',
43 'mpiwrapper': 'lalinference_mpi_wrapper',
44 'gracedb': 'gracedb',
45 'ppanalysis': 'cbcBayesPPAnalysis',
46 'pos_to_sim_inspiral': 'cbcBayesPosToSimInspiral',
47 'bayeswave': 'BayesWave',
48 'bayeswavepost': 'BayesWavePost'}
51def _data_exists(end, frametype_dict):
52 """Check whether data at end time can be found with gwdatafind and return
53 true it it is found.
54 """
55 return min(
56 len(
57 find_urls(ifo[0], frametype_dict[ifo], end, end + 1)
58 ) for ifo in frametype_dict.keys()
59 ) > 0
62class NotEnoughData(Exception):
63 """Raised if found data is not enough due to the latency of data
64 transfer
65 """
68@app.task(bind=True, autoretry_for=(NotEnoughData, ), default_retry_delay=1,
69 max_retries=86400, retry_backoff=True, shared=False)
70def query_data(self, trigtime):
71 """Continues to query data until it is found with gwdatafind and return
72 frametypes for the data. If data is not found in 86400 seconds = 1 day,
73 raise NotEnoughData.
74 """
75 end = trigtime + 2
76 if _data_exists(end, app.conf['low_latency_frame_types']):
77 return app.conf['low_latency_frame_types']
78 elif _data_exists(end, app.conf['high_latency_frame_types']):
79 return app.conf['high_latency_frame_types']
80 else:
81 raise NotEnoughData
84@app.task(ignore_result=True, shared=False)
85def upload_no_frame_files(request, exc, traceback, superevent_id):
86 """Upload notification when no frame files are found.
88 Parameters
89 ----------
90 request : Context (placeholder)
91 Task request variables
92 exc : Exception
93 Exception rased by condor.submit
94 traceback : str (placeholder)
95 Traceback message from a task
96 superevent_id : str
97 The GraceDB ID of a target superevent
99 """
100 if isinstance(exc, NotEnoughData):
101 gracedb.upload.delay(
102 filecontents=None, filename=None,
103 graceid=superevent_id,
104 message='Frame files have not been found.',
105 tags='pe'
106 )
109def _find_appropriate_cal_env(trigtime, dir_name):
110 """Return the path to the calibration uncertainties estimated at the time
111 before and closest to the trigger time. If there are no calibration
112 uncertainties estimated before the trigger time, return the oldest one. The
113 gpstimes at which the calibration uncertainties were estimated and the
114 names of the files containing the uncertaintes are saved in
115 [HLV]_CalEnvs.txt.
117 Parameters
118 ----------
119 trigtime : float
120 The trigger time of a target event
121 dir_name : str
122 The path to the directory where files containing calibration
123 uncertainties exist
125 Return
126 ------
127 path : str
128 The path to the calibration uncertainties appropriate for a target
129 event
131 """
132 filename, = glob.glob(os.path.join(dir_name, '[HLV]_CalEnvs.txt'))
133 calibration_index = np.atleast_1d(
134 np.recfromtxt(filename, names=['gpstime', 'filename'])
135 )
136 gpstimes = calibration_index['gpstime']
137 candidate_gpstimes = gpstimes < trigtime
138 if np.any(candidate_gpstimes):
139 idx = np.argmax(gpstimes * candidate_gpstimes)
140 appropriate_cal = calibration_index['filename'][idx]
141 else:
142 appropriate_cal = calibration_index['filename'][np.argmin(gpstimes)]
143 return os.path.join(dir_name, appropriate_cal.decode('utf-8'))
146@app.task(shared=False)
147def prepare_ini(frametype_dict, event, superevent_id=None):
148 """Determine an appropriate PE settings for the target event and return ini
149 file content for LALInference pipeline
150 """
151 # Get template of .ini file
152 ini_template = env.get_template('online_pe.jinja2')
154 # fill out the ini template and return the resultant content
155 singleinspiraltable = event['extra_attributes']['SingleInspiral']
156 trigtime = event['gpstime']
157 ini_settings = {
158 'gracedb_host': app.conf['gracedb_host'],
159 'types': frametype_dict,
160 'channels': app.conf['strain_channel_names'],
161 'state_vector_channels': app.conf['state_vector_channel_names'],
162 'webdir': os.path.join(
163 app.conf['pe_results_path'], event['graceid'], 'lalinference'
164 ),
165 'paths': [{'name': name, 'path': find_executable(executable)}
166 for name, executable in executables.items()],
167 'h1_calibration': _find_appropriate_cal_env(
168 trigtime,
169 '/home/cbc/pe/O3/calibrationenvelopes/LIGO_Hanford'
170 ),
171 'l1_calibration': _find_appropriate_cal_env(
172 trigtime,
173 '/home/cbc/pe/O3/calibrationenvelopes/LIGO_Livingston'
174 ),
175 'v1_calibration': _find_appropriate_cal_env(
176 trigtime,
177 '/home/cbc/pe/O3/calibrationenvelopes/Virgo'
178 ),
179 'mc': min([sngl['mchirp'] for sngl in singleinspiraltable]),
180 'q': min([sngl['mass2'] / sngl['mass1']
181 for sngl in singleinspiraltable]),
182 'mpirun': find_executable('mpirun')
183 }
184 ini_rota = ini_template.render(ini_settings)
185 ini_settings.update({'use_of_ini': 'online'})
186 ini_online = ini_template.render(ini_settings)
187 # upload LALInference ini file to GraceDB
188 if superevent_id is not None:
189 gracedb.upload.delay(
190 ini_rota, filename=ini_name, graceid=superevent_id,
191 message=('Automatically generated LALInference configuration file'
192 ' for this event.'),
193 tags='pe')
195 return ini_online
198def pre_pe_tasks(event, superevent_id):
199 """Return canvas of tasks executed before parameter estimation starts"""
200 return query_data.s(event['gpstime']).on_error(
201 upload_no_frame_files.s(superevent_id)
202 ) | prepare_ini.s(event, superevent_id)
205@app.task(shared=False)
206def _setup_dag_for_lalinference(coinc_psd, ini_contents,
207 rundir, superevent_id):
208 """Create DAG for a lalinference run and return the path to DAG.
210 Parameters
211 ----------
212 coinc_psd : tuple of byte contents
213 Tuple of the byte contents of ``coinc.xml`` and ``psd.xml.gz``
214 ini_contents : str
215 The content of online_lalinference_pe.ini
216 rundir : str
217 The path to a run directory where the DAG file exits
218 superevent_id : str
219 The GraceDB ID of a target superevent
221 Returns
222 -------
223 path_to_dag : str
224 The path to the .dag file
226 """
227 coinc_contents, psd_contents = coinc_psd
229 # write down coinc.xml in the run directory
230 path_to_coinc = os.path.join(rundir, 'coinc.xml')
231 with open(path_to_coinc, 'wb') as f:
232 f.write(coinc_contents)
234 # write down psd.xml.gz
235 if psd_contents is not None:
236 path_to_psd = os.path.join(rundir, 'psd.xml.gz')
237 with open(path_to_psd, 'wb') as f:
238 f.write(psd_contents)
239 psd_arg = ['--psd', path_to_psd]
240 else:
241 psd_arg = []
243 # write down .ini file in the run directory.
244 path_to_ini = os.path.join(rundir, ini_name)
245 with open(path_to_ini, 'w') as f:
246 f.write(ini_contents)
248 try:
249 subprocess.run(
250 ['lalinference_pipe', '--run-path', rundir,
251 '--coinc', path_to_coinc, path_to_ini] + psd_arg,
252 capture_output=True, check=True)
253 except subprocess.CalledProcessError as e:
254 contents = b'args:\n' + json.dumps(e.args[1]).encode('utf-8') + \
255 b'\n\nstdout:\n' + e.stdout + b'\n\nstderr:\n' + e.stderr
256 gracedb.upload.delay(
257 filecontents=contents, filename='pe_dag.log',
258 graceid=superevent_id,
259 message='Failed to prepare DAG for lalinference', tags='pe'
260 )
261 shutil.rmtree(rundir)
262 raise
263 else:
264 # Remove the ini file so that people do not accidentally use this ini
265 # file and hence online-PE-only nodes.
266 os.remove(path_to_ini)
268 return os.path.join(rundir, 'multidag.dag')
271@app.task(shared=False)
272def _setup_dag_for_bilby(event, rundir, preferred_event_id, superevent_id):
273 """Create DAG for a bilby run and return the path to DAG.
275 Parameters
276 ----------
277 event : json contents
278 The json contents retrieved from gracedb.get_event()
279 rundir : str
280 The path to a run directory where the DAG file exits
281 preferred_event_id : str
282 The GraceDB ID of a target preferred event
283 superevent_id : str
284 The GraceDB ID of a target superevent
286 Returns
287 -------
288 path_to_dag : str
289 The path to the .dag file
291 """
292 path_to_json = os.path.join(rundir, 'event.json')
293 with open(path_to_json, 'w') as f:
294 json.dump(event, f, indent=2)
296 path_to_webdir = os.path.join(
297 app.conf['pe_results_path'], preferred_event_id, 'bilby'
298 )
300 setup_arg = ['bilby_pipe_gracedb', '--webdir', path_to_webdir,
301 '--outdir', rundir, '--json', path_to_json, '--online-pe',
302 '--convert-to-flat-in-component-mass']
304 if not app.conf['gracedb_host'] == 'gracedb.ligo.org':
305 setup_arg += ['--channel-dict', 'o2replay',
306 '--sampler-kwargs', 'FastTest']
307 try:
308 subprocess.run(setup_arg, capture_output=True, check=True)
309 except subprocess.CalledProcessError as e:
310 contents = b'args:\n' + json.dumps(e.args[1]).encode('utf-8') + \
311 b'\n\nstdout:\n' + e.stdout + b'\n\nstderr:\n' + e.stderr
312 gracedb.upload.delay(
313 filecontents=contents, filename='pe_dag.log',
314 graceid=superevent_id,
315 message='Failed to prepare DAG for bilby', tags='pe'
316 )
317 shutil.rmtree(rundir)
318 raise
319 else:
320 # Uploads bilby ini file to GraceDB
321 group(upload_results_tasks(
322 rundir, 'bilby_config.ini', superevent_id,
323 'Automatically generated Bilby configuration file',
324 'pe', 'online_bilby_pe.ini')).delay()
326 path_to_dag, = glob.glob(os.path.join(rundir, 'submit/dag*.submit'))
327 print(path_to_dag)
328 return path_to_dag
331@app.task(shared=False)
332def _condor_no_submit(path_to_dag):
333 """Run 'condor_submit_dag -no_submit' and return the path to .sub file."""
334 subprocess.run(['condor_submit_dag', '-no_submit', path_to_dag],
335 capture_output=True, check=True)
336 return '{}.condor.sub'.format(path_to_dag)
339@app.task(shared=False)
340def dag_prepare_task(rundir, superevent_id, preferred_event_id, pe_pipeline,
341 ini_contents=None):
342 """Return a canvas of tasks to prepare DAG.
344 Parameters
345 ----------
346 rundir : str
347 The path to a run directory where the DAG file exits
348 superevent_id : str
349 The GraceDB ID of a target superevent
350 preferred_event_id : str
351 The GraceDB ID of a target preferred event
352 pe_pipeline : str
353 The parameter estimation pipeline used
354 Either 'lalinference' OR 'bilby'
355 ini_contents : str
356 The content of online_lalinference_pe.ini
357 Required if pe_pipeline == 'lalinference'
359 Returns
360 -------
361 canvas : canvas of tasks
362 The canvas of tasks to prepare DAG
364 """
365 if pe_pipeline == 'lalinference':
366 canvas = ordered_group(
367 gracedb.download.si('coinc.xml', preferred_event_id),
368 _download_psd.si(preferred_event_id)
369 ) | _setup_dag_for_lalinference.s(ini_contents, rundir, superevent_id)
370 elif pe_pipeline == 'bilby':
371 canvas = gracedb.get_event.si(preferred_event_id) | \
372 _setup_dag_for_bilby.s(rundir, preferred_event_id, superevent_id)
373 else:
374 raise NotImplementedError(f'Unknown PE pipeline {pe_pipeline}.')
375 canvas |= _condor_no_submit.s()
376 return canvas
379def _find_paths_from_name(directory, name):
380 """Return the paths of files or directories with given name under the
381 specfied directory
383 Parameters
384 ----------
385 directory : string
386 Name of directory under which the target file or directory is searched
387 for.
388 name : string
389 Name of target files or directories
391 Returns
392 -------
393 paths : generator
394 Paths to the target files or directories
396 """
397 return glob.iglob(os.path.join(directory, '**', name), recursive=True)
400@app.task(ignore_result=True, shared=False)
401def job_error_notification(request, exc, traceback,
402 superevent_id, rundir, pe_pipeline):
403 """Upload notification when condor.submit terminates unexpectedly.
405 Parameters
406 ----------
407 request : Context (placeholder)
408 Task request variables
409 exc : Exception
410 Exception rased by condor.submit
411 traceback : str (placeholder)
412 Traceback message from a task
413 superevent_id : str
414 The GraceDB ID of a target superevent
415 rundir : str
416 The run directory for PE
417 pe_pipeline : str
418 The parameter estimation pipeline used
419 Either lalinference OR bilby
421 """
422 if isinstance(exc, condor.JobAborted):
423 gracedb.upload.delay(
424 filecontents=None, filename=None, graceid=superevent_id, tags='pe',
425 message='The {} condor job was aborted.'.format(pe_pipeline)
426 )
427 elif isinstance(exc, condor.JobFailed):
428 gracedb.upload.delay(
429 filecontents=None, filename=None, graceid=superevent_id, tags='pe',
430 message='The {} condor job failed.'.format(pe_pipeline)
431 )
432 # Get paths to .log files, .err files, .out files
433 paths_to_log = _find_paths_from_name(rundir, '*.log')
434 paths_to_err = _find_paths_from_name(rundir, '*.err')
435 paths_to_out = _find_paths_from_name(rundir, '*.out')
436 # Upload .log and .err files
437 for path in itertools.chain(paths_to_log, paths_to_err, paths_to_out):
438 with open(path, 'rb') as f:
439 contents = f.read()
440 if contents:
441 # put .log suffix in log file names so that users can directly
442 # read the contents instead of downloading them when they click
443 # file names
444 gracedb.upload.delay(
445 filecontents=contents,
446 filename=os.path.basename(path) + '.log',
447 graceid=superevent_id,
448 message='A log file for {} condor job.'.format(pe_pipeline),
449 tags='pe'
450 )
453@app.task(ignore_result=True, shared=False)
454def _upload_url(pe_results_path, graceid, pe_pipeline):
455 """Upload url of a page containing all of the plots."""
456 if pe_pipeline == 'lalinference':
457 path_to_posplots, = _find_paths_from_name(
458 pe_results_path, 'posplots.html'
459 )
460 elif pe_pipeline == 'bilby':
461 path_to_posplots, = _find_paths_from_name(
462 pe_results_path, 'home.html'
463 )
464 else:
465 raise NotImplementedError(f'Unknown PE pipeline {pe_pipeline}.')
467 baseurl = urllib.parse.urljoin(
468 app.conf['pe_results_url'],
469 os.path.relpath(
470 path_to_posplots,
471 app.conf['pe_results_path']
472 )
473 )
474 gracedb.upload.delay(
475 filecontents=None, filename=None, graceid=graceid,
476 message=('Online {} parameter estimation finished.'
477 '<a href={}>results</a>').format(pe_pipeline, baseurl),
478 tags='pe'
479 )
482def upload_results_tasks(pe_results_path, filename, graceid, message, tag,
483 uploaded_filename=None):
484 """Return tasks to get the contents of PE result files and upload them to
485 GraceDB.
487 Parameters
488 ----------
489 pe_results_path : string
490 Directory under which the target file located.
491 filename : string
492 Name of the target file
493 graceid : string
494 GraceDB ID
495 message : string
496 Message uploaded to GraceDB
497 tag : str
498 Name of tag to add the GraceDB log
499 uploaded_filename : str
500 Name of the uploaded file. If not supplied, it is the same as the
501 original file name.
503 Returns
504 -------
505 tasks : list of celery tasks
507 """
508 tasks = []
509 for path in _find_paths_from_name(pe_results_path, filename):
510 if uploaded_filename is None:
511 _uploaded_filename = os.path.basename(path)
512 else:
513 _uploaded_filename = uploaded_filename
514 with open(path, 'rb') as f:
515 tasks.append(gracedb.upload.si(f.read(), _uploaded_filename,
516 graceid, message, tag))
517 return tasks
520@app.task(ignore_result=True, shared=False)
521def clean_up(rundir):
522 """Clean up a run directory.
524 Parameters
525 ----------
526 rundir : str
527 The path to a run directory where the DAG file exits
529 """
530 shutil.rmtree(rundir)
533@app.task(ignore_result=True, shared=False)
534def dag_finished(rundir, preferred_event_id, superevent_id, pe_pipeline):
535 """Upload PE results and clean up run directory
537 Parameters
538 ----------
539 rundir : str
540 The path to a run directory where the DAG file exits
541 preferred_event_id : str
542 The GraceDB ID of a target preferred event
543 superevent_id : str
544 The GraceDB ID of a target superevent
545 pe_pipeline : str
546 The parameter estimation pipeline used
547 Either lalinference OR bilby
549 Returns
550 -------
551 tasks : canvas
552 The work-flow for uploading PE results
554 """
555 pe_results_path = os.path.join(
556 app.conf['pe_results_path'], preferred_event_id, pe_pipeline
557 )
559 if pe_pipeline == 'lalinference':
560 uploads = [
561 (rundir, 'glitch_median_PSD_forLI_*.dat',
562 'Bayeswave PSD used for LALInference PE', None),
563 (rundir, 'lalinference*.dag', 'LALInference DAG', None),
564 (rundir, 'posterior*.hdf5',
565 'LALInference posterior samples',
566 'LALInference.posterior_samples.hdf5'),
567 (pe_results_path, 'extrinsic.png',
568 'LALInference corner plot for extrinsic parameters',
569 'LALInference.extrinsic.png'),
570 (pe_results_path, 'sourceFrame.png',
571 'LALInference corner plot for source frame parameters',
572 'LALInference.intrinsic.png')
573 ]
574 elif pe_pipeline == 'bilby':
575 resultdir = os.path.join(rundir, 'result')
576 uploads = [
577 (resultdir, '*merge_result.json',
578 'Bilby posterior samples',
579 'Bilby.posterior_samples.json'),
580 (resultdir, '*_extrinsic_corner.png',
581 'Bilby corner plot for extrinsic parameters',
582 'Bilby.extrinsic.png'),
583 (resultdir, '*_intrinsic_corner.png',
584 'Bilby corner plot for intrinsic parameters',
585 'Bilby.intrinsic.png')
586 ]
587 else:
588 raise NotImplementedError(f'Unknown PE pipeline {pe_pipeline}.')
590 upload_tasks = []
591 for dir, name1, comment, name2 in uploads:
592 upload_tasks += upload_results_tasks(
593 dir, name1, superevent_id, comment, 'pe', name2)
595 # FIXME: _upload_url.si has to be out of group for
596 # gracedb.create_label.si to run
597 (
598 _upload_url.si(pe_results_path, superevent_id, pe_pipeline)
599 |
600 group(upload_tasks)
601 |
602 clean_up.si(rundir)
603 ).delay()
605 if pe_pipeline == 'lalinference':
606 gracedb.create_label.delay('PE_READY', superevent_id)
609@gracedb.task(shared=False)
610def _download_psd(gid):
611 """Download ``psd.xml.gz`` and return its content. If that file does not
612 exist, return None.
613 """
614 try:
615 return gracedb.download("psd.xml.gz", gid)
616 except HTTPError:
617 return None
620@app.task(ignore_result=True, shared=False)
621def start_pe(ini_contents, preferred_event_id, superevent_id, pe_pipeline):
622 """Run Parameter Estimation on a given event.
624 Parameters
625 ----------
626 ini_contents : str
627 The content of online_lalinference_pe.ini
628 preferred_event_id : str
629 The GraceDB ID of a target preferred event
630 superevent_id : str
631 The GraceDB ID of a target superevent
632 pe_pipeline : str
633 The parameter estimation pipeline used
634 lalinference OR bilby
636 """
637 gracedb.upload.delay(
638 filecontents=None, filename=None, graceid=superevent_id,
639 message=('Starting {} online parameter estimation '
640 'for {}').format(pe_pipeline, preferred_event_id),
641 tags='pe'
642 )
644 # make a run directory
645 pipeline_dir = os.path.expanduser('~/.cache/{}'.format(pe_pipeline))
646 mkpath(pipeline_dir)
647 rundir = tempfile.mkdtemp(
648 dir=pipeline_dir, prefix='{}_'.format(superevent_id)
649 )
651 # give permissions to read the files under the run directory so that PE
652 # ROTA people can check the status of parameter estimation.
653 os.chmod(rundir, 0o755)
655 canvas = (
656 dag_prepare_task(
657 rundir, superevent_id, preferred_event_id,
658 pe_pipeline, ini_contents
659 )
660 |
661 condor.submit.s().on_error(
662 job_error_notification.s(superevent_id, rundir, pe_pipeline)
663 )
664 |
665 dag_finished.si(
666 rundir, preferred_event_id, superevent_id, pe_pipeline
667 )
668 )
669 canvas.delay()