Coverage for gwcelery/tasks/detchar.py: 95%
231 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-17 17:22 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-17 17:22 +0000
1"""Data quality and detector characterization tasks.
3These tasks are mostly focused on checking interferometer state vectors. By
4design, the [LIGO]_ and [Virgo]_ state vectors share the same definitions for
5the first 8 fields.
7LIGO also has a [DMT]_ DQ vector that provides some additional instrumental
8checks.
10References
11----------
12.. [LIGO] https://wiki.ligo.org/Calibration/TDCalibReview
13.. [Virgo] https://dcc.ligo.org/G1801125/
14.. [DMT] https://wiki.ligo.org/DetChar/DmtDqVector
16"""
17import getpass
18import glob
19import io
20import json
21import socket
22import time
24import matplotlib.pyplot as plt
25import numpy as np
26import sentry_sdk
27from astropy.time import Time
28from celery import group
29from celery.utils.log import get_task_logger
30from glue.lal import Cache
31from gwdatafind import find_urls
32from gwpy.plot import Plot
33from gwpy.timeseries import Bits, StateVector, TimeSeries
35from .. import _version, app
36from ..jinja import env
37from ..util import closing_figures
38from . import gracedb
40__author__ = 'Geoffrey Mo <geoffrey.mo@ligo.org>'
42log = get_task_logger(__name__)
45def create_cache(ifo, start, end):
46 """Find .gwf files and create cache. Will first look in the llhoft, and
47 if the frames have expired from llhoft, will call gwdatafind.
49 Parameters
50 ----------
51 ifo : str
52 Interferometer name (e.g. ``H1``).
53 start, end: int or float
54 GPS start and end times desired.
56 Returns
57 -------
58 :class:`glue.lal.Cache`
60 Example
61 -------
62 >>> create_cache('H1', 1198800018, 1198800618)
63 [<glue.lal.CacheEntry at 0x7fbae6b71278>,
64 <glue.lal.CacheEntry at 0x7fbae6ae5b38>,
65 <glue.lal.CacheEntry at 0x7fbae6ae5c50>,
66 ...
67 <glue.lal.CacheEntry at 0x7fbae6b15080>,
68 <glue.lal.CacheEntry at 0x7fbae6b15828>]
70 """
71 pattern = app.conf['llhoft_glob'].format(detector=ifo)
72 filenames = glob.glob(pattern)
73 cache = Cache.from_urls(filenames)
75 try:
76 cache_segment = list(cache.to_segmentlistdict().values())[0][0]
77 cache_starttime = cache_segment[0]
78 cache_endtime = cache_segment[1]
80 except IndexError:
81 log.exception('Files do not exist in llhoft_glob')
82 return cache # returns empty cache
84 if (cache_starttime <= start) and (end <= cache_endtime):
85 # required data is in llhoft
86 return cache
88 # otherwise, required data is not in the low latency cache
89 high_latency = app.conf['high_latency_frame_types'][ifo]
90 urls = find_urls(ifo[0], high_latency, start, end)
91 if urls:
92 return Cache.from_urls(urls)
94 # required data not in high latency frames
95 low_latency = app.conf['low_latency_frame_types'][ifo]
96 urls = find_urls(ifo[0], low_latency, start, end)
97 if not urls: # required data not in low latency frames
98 log.error('This data cannot be found, or does not exist.')
100 return Cache.from_urls(urls)
103@app.task(shared=False)
104@closing_figures()
105def make_omegascan(ifo, t0, durs):
106 """Helper function to create a single omegascan image, with
107 multiple durations.
109 Parameters
110 ----------
111 ifo : str
112 'H1', 'L1', or 'V1'
113 t0 : int or float
114 Central time of the omegascan.
115 durs : list of tuples
116 List of three tuples, with time before and time after t0 in seconds.
117 Example: [(0.75, 0.25), (1.5, 0.5), (7.5, 2.5)]
119 Returns
120 -------
121 bytes or None
122 bytes of png of the omegascan, or None if no omegascan created.
124 """
125 # Explicitly use a non-interactive Matplotlib backend.
126 plt.switch_backend('agg')
128 # Collect data
129 durs = np.array(durs)
130 longest_before, longest_after = durs[:, 0].max(), durs[:, 1].max()
131 # Add extra data buffer to avoid whitening window issues
132 long_start, long_end = t0 - longest_before - 2, t0 + longest_after + 2
133 cache = create_cache(ifo, long_start, long_end)
134 strain_name = app.conf['strain_channel_names'][ifo]
135 try:
136 ts = TimeSeries.read(cache, strain_name,
137 start=long_start, end=long_end).astype('float64')
138 # Do q_transforms for the different durations
139 qgrams = [ts.q_transform(
140 frange=(10, 4096), gps=t0,
141 outseg=(t0 - before, t0 + after), logf=True)
142 for before, after in durs]
143 except (IndexError, FloatingPointError, ValueError) as err:
144 # data from cache can't be properly read, or data is weird
145 sentry_sdk.capture_exception(err)
146 fig = plt.figure(figsize=(4, 1))
147 plt.axis("off")
148 plt.text(0.5, 0.5, f"Failed to create {ifo} omegascan",
149 horizontalalignment='center', verticalalignment='center',
150 fontsize=17)
151 else:
152 fig = Plot(*qgrams,
153 figsize=(12 * len(durs), 6),
154 geometry=(1, len(durs)),
155 yscale='log',
156 method='pcolormesh',
157 clim=(0, 30),
158 cmap='viridis')
159 for i, ax in enumerate(fig.axes):
160 ax.set_title(f'Q = {qgrams[i].q:.3g}')
161 if i in [1, 2]:
162 ax.set_ylabel('')
163 if i == 2:
164 fig.colorbar(ax=ax, label='Normalized energy')
165 ax.set_epoch(t0)
166 fig.suptitle(f'Omegascans of {strain_name} at {t0}', fontweight="bold")
167 plt.subplots_adjust(wspace=0.08)
168 outfile = io.BytesIO()
169 fig.savefig(outfile, format='png', dpi=150, bbox_inches='tight')
170 return outfile.getvalue()
173@app.task(shared=False)
174def omegascan(t0, graceid):
175 """Create omegascan for a certain event.
177 Parameters
178 ----------
179 t0 : float
180 Central event time.
181 graceid : str
182 GraceDB ID to which to upload the omegascan.
184 """
185 durs = app.conf['omegascan_durations']
186 durs = np.array(durs)
188 # Delay for early warning events (ie queries for times before now)
189 longest_after = durs[:, 1].max() + 2 # extra data for whitening window
190 if t0 + longest_after > Time.now().gps:
191 log.info("Delaying omegascan because %s is in the future",
192 graceid)
193 waittime = t0 - Time.now().gps + longest_after + 10
194 else:
195 waittime = longest_after + 10
197 group(
198 make_omegascan.s(ifo, t0, durs).set(countdown=waittime)
199 |
200 gracedb.upload.s(f"{ifo}_omegascan.png", graceid,
201 f"{ifo} omegascan", tags=['data_quality'])
202 for ifo in ['H1', 'L1', 'V1']
203 ).delay()
206def generate_table(title, high_bit_list, low_bit_list, unknown_bit_list):
207 """Make a nice table which shows the status of the bits checked.
209 Parameters
210 ----------
211 title : str
212 Title of the table.
213 high_bit_list: list
214 List of bit names which are high.
215 low_bit_list: list
216 List of bit names which are low.
217 unknown_bit_list: list
218 List of bit names which are unknown.
220 Returns
221 -------
222 str
223 HTML string of the table.
225 """
226 template = env.get_template('vector_table.jinja2')
227 return template.render(title=title, high_bit_list=high_bit_list,
228 low_bit_list=low_bit_list,
229 unknown_bit_list=unknown_bit_list)
232def dqr_json(state, summary):
233 """Generate DQR-compatible json-ready dictionary from process results, as
234 described in :class:`data-quality-report.design`.
236 Parameters
237 ----------
238 state : {'pass', 'fail'}
239 State of the detchar checks.
240 summary : str
241 Summary of results from the process.
243 Returns
244 -------
245 dict
246 Ready to be converted into json.
248 """
249 links = [
250 {
251 "href":
252 "https://gwcelery.readthedocs.io/en/latest/gwcelery.tasks.detchar.html#gwcelery.tasks.detchar.check_vectors", # noqa
253 "innerHTML": "a link to the documentation for this process"
254 },
255 {
256 "href":
257 "https://git.ligo.org/emfollow/gwcelery/blob/main/gwcelery/tasks/detchar.py", # noqa
258 "innerHTML": "a link to the source code in the gwcelery repo"
259 }
260 ]
261 return dict(
262 state=state,
263 process_name=__name__,
264 process_version=_version.get_versions()['version'],
265 librarian='Geoffrey Mo (geoffrey.mo@ligo.org)',
266 date=time.strftime("%H:%M:%S UTC %a %d %b %Y", time.gmtime()),
267 hostname=socket.gethostname(),
268 username=getpass.getuser(),
269 summary=summary,
270 figures=[],
271 tables=[],
272 links=links,
273 extra=[],
274 )
277def ifo_from_channel(channel):
278 '''Get detector prefix from a channel.
280 Parameters
281 ----------
282 channel : str
283 Channel, e.g., H1:GDS-CALIB_STRAIN.
285 Returns
286 -------
287 str
288 Detector prefix, e.g., H1.
289 '''
290 ifo = channel.split(':')[0]
291 assert len(ifo) == 2, "Detector name should be two letters"
292 return ifo
295def check_idq(cache, channel, start, end):
296 """Looks for iDQ frame and reads them.
298 Parameters
299 ----------
300 cache : :class:`glue.lal.Cache`
301 Cache from which to check.
302 channel : str
303 which idq channel (FAP)
304 start, end: int or float
305 GPS start and end times desired.
307 Returns
308 -------
309 tuple
310 Tuple mapping iDQ channel to its minimum FAP.
312 Example
313 -------
314 >>> check_idq(cache, 'H1:IDQ-FAP_OVL_100_1000',
315 1216496260, 1216496262)
316 ('H1:IDQ-FAP_OVL_100_1000', 0.003)
318 """
319 if cache:
320 try:
321 idq_fap = TimeSeries.read(
322 cache, channel, start=start, end=end)
323 return (channel, float(idq_fap.min().value))
324 except (IndexError, RuntimeError, ValueError):
325 log.exception('Failed to read from low-latency iDQ frame files')
326 # FIXME: figure out how to get access to low-latency frames outside
327 # of the cluster. Until we figure that out, actual I/O errors have
328 # to be non-fatal.
329 return (channel, None)
332def check_vector(cache, channel, start, end, bits, logic_type='all'):
333 """Check timeseries of decimals against a bitmask.
334 This is inclusive of the start time and exclusive of the end time, i.e.
335 [start, ..., end).
337 Parameters
338 ----------
339 cache : :class:`glue.lal.Cache`
340 Cache from which to check.
341 channel : str
342 Channel to look at, e.g. ``H1:DMT-DQ_VECTOR``.
343 start, end : int or float
344 GPS start and end times desired.
345 bits: :class:`gwpy.TimeSeries.Bits`
346 Definitions of the bits in the channel.
347 logic_type : str, optional
348 Type of logic to apply for vetoing.
349 If ``all``, then all samples in the window must pass the bitmask.
350 If ``any``, then one or more samples in the window must pass.
352 Returns
353 -------
354 dict
355 Maps each bit in channel to its state.
357 Example
358 -------
359 >>> check_vector(cache, 'H1:GDS-CALIB_STATE_VECTOR', 1216496260,
360 1216496262, ligo_state_vector_bits)
361 {'H1:HOFT_OK': True,
362 'H1:OBSERVATION_INTENT': True,
363 'H1:NO_STOCH_HW_INJ': True,
364 'H1:NO_CBC_HW_INJ': True,
365 'H1:NO_BURST_HW_INJ': True,
366 'H1:NO_DETCHAR_HW_INJ': True}
368 """
369 if logic_type not in ('any', 'all'):
370 raise ValueError("logic_type must be either 'all' or 'any'.")
371 else:
372 logic_map = {'any': np.any, 'all': np.all}
373 bitname = '{}:{}'
374 if cache:
375 try:
376 statevector = StateVector.read(cache, channel,
377 start=start, end=end, bits=bits)
378 except (IndexError, TypeError, ValueError):
379 log.exception('Failed to read from low-latency frame files')
380 else:
381 # FIXME: In the playground environment, the Virgo state vector
382 # channel is stored as a float. Is this also the case in the
383 # production environment?
384 statevector = statevector.astype(np.uint32)
385 if len(statevector) > 0: # statevector must not be empty
386 return {bitname.format(ifo_from_channel(channel), key):
387 bool(logic_map[logic_type](
388 value.value if len(value.value) > 0 else None))
389 for key, value in statevector.get_bit_series().items()}
390 # FIXME: figure out how to get access to low-latency frames outside
391 # of the cluster. Until we figure that out, actual I/O errors have
392 # to be non-fatal.
393 return {bitname.format(ifo_from_channel(channel), key):
394 None for key in bits if key is not None}
397@app.task(shared=False, bind=True, default_retry_delay=5)
398def check_vectors(self, event, graceid, start, end):
399 """Perform data quality checks for an event and labels/logs results to
400 GraceDB.
402 Depending on the pipeline, a certain amount of time (specified in
403 :obj:`~gwcelery.conf.check_vector_prepost`) is appended to either side of
404 the superevent start and end time. This is to catch DQ issues slightly
405 before and after the event, such as that appearing in L1 just before
406 GW170817.
408 A cache is then created for H1, L1, and V1, regardless of the detectors
409 involved in the event. Then, the bits and channels specified in the
410 configuration file (:obj:`~gwcelery.conf.llhoft_channels`) are checked.
411 If an injection is found in the active detectors, 'INJ' is labeled to
412 GraceDB. If an injection is found in any detector, a message with the
413 injection found is logged to GraceDB. If no injections are found across
414 all detectors, this is logged to GraceDB.
416 A similar task is performed for the DQ states described in the
417 DMT-DQ_VECTOR, LIGO GDS-CALIB_STATE_VECTOR, and Virgo
418 DQ_ANALYSIS_STATE_VECTOR. If no DQ issues are found in active detectors,
419 'DQOK' is labeled to GraceDB. Otherwise, 'DQV' is labeled. In all cases,
420 the DQ states of all the state vectors checked are logged to GraceDB.
422 This skips MDC events.
424 Parameters
425 ----------
426 event : dict
427 Details of event.
428 graceid : str
429 GraceID of event to which to log.
430 start, end : int or float
431 GPS start and end times desired.
433 Returns
434 -------
435 event : dict
436 Details of the event, reflecting any labels that were added.
438 """
439 # Skip early warning events (ie queries for times before now)
440 if end > Time.now().gps:
441 log.info("Skipping detchar checks because %s is in the future",
442 event['graceid'])
443 return event
445 # Skip MDC events.
446 if event.get('search') == 'MDC':
447 log.info("Skipping detchar checks because %s is an MDC",
448 event['graceid'])
449 return event
451 # Create caches for all detectors
452 instruments = event['instruments'].split(',')
453 pipeline = event['pipeline']
454 pre, post = app.conf['check_vector_prepost'][pipeline]
455 start, end = start - pre, end + post
456 prepost_msg = "Check looked within -{}/+{} seconds of superevent. ".format(
457 pre, post)
459 ifos = {ifo_from_channel(key) for key, val in
460 app.conf['llhoft_channels'].items()}
461 caches = {ifo: create_cache(ifo, start, end) for ifo in ifos}
462 bit_defs = {channel_type: Bits(channel=bitdef['channel'],
463 bits=bitdef['bits'])
464 for channel_type, bitdef
465 in app.conf['detchar_bit_definitions'].items()}
467 # Examine injection and DQ states
468 # Do not analyze DMT-DQ_VECTOR if pipeline uses gated h(t)
469 states = {}
470 analysis_channels = app.conf['llhoft_channels'].items()
471 if app.conf['uses_gatedhoft'][pipeline]:
472 analysis_channels = {k: v for k, v in analysis_channels
473 if k[3:] != 'DMT-DQ_VECTOR'}.items()
474 for channel, bits in analysis_channels:
475 try:
476 states.update(check_vector(
477 caches[ifo_from_channel(channel)], channel,
478 start, end, bit_defs[bits]))
479 except ValueError as exc:
480 # check_vector likely failed to find the requested data
481 # in the cache because it has yet to arrive
482 raise self.retry(exc=exc, max_retries=4)
483 # Pick out DQ and injection states, then filter for active detectors
484 dq_states = {key: value for key, value in states.items()
485 if key.split('_')[-1] != 'INJ'}
486 inj_states = {key: value for key, value in states.items()
487 if key.split('_')[-1] == 'INJ'}
488 active_dq_states = {key: value for key, value in dq_states.items()
489 if ifo_from_channel(key) in instruments}
490 active_inj_states = {key: value for key, value in inj_states.items()
491 if ifo_from_channel(channel) in instruments}
493 # Check iDQ states and filter for active instruments
494 idq_faps = dict(check_idq(caches[ifo_from_channel(channel)],
495 channel, start, end)
496 for channel in app.conf['idq_channels']
497 if ifo_from_channel(channel) in instruments)
498 idq_oks = dict(check_idq(caches[ifo_from_channel(channel)],
499 channel, start, end)
500 for channel in app.conf['idq_ok_channels']
501 if ifo_from_channel(channel) in instruments)
503 # Logging iDQ to GraceDB
504 # Checking the IDQ-OK vector
505 idq_not_ok_ifos = [
506 ifo_from_channel(ok_channel)
507 for ok_channel, min_value in idq_oks.items()
508 if min_value == 0 or min_value is None]
509 idq_not_ok_fap_chans = [
510 chan for chan in idq_faps.keys()
511 if ifo_from_channel(chan) in idq_not_ok_ifos]
512 # Remove iDQ FAP channels if their IDQ_OK values are bad
513 for idq_not_ok_chan in idq_not_ok_fap_chans:
514 del idq_faps[idq_not_ok_chan]
515 if len(idq_not_ok_ifos) > 0:
516 idq_ok_msg = (f"Not checking iDQ for {', '.join(idq_not_ok_ifos)} "
517 "because it has times where IDQ_OK = 0. ")
518 else:
519 idq_ok_msg = ''
521 if None not in idq_faps.values() and len(idq_faps) > 0:
522 idq_faps_readable = {k: round(v, 3) for k, v in idq_faps.items()}
523 if min(idq_faps.values()) <= app.conf['idq_fap_thresh']:
524 idq_msg = ("iDQ false alarm probability is low "
525 "(below {} threshold), "
526 "i.e., there could be a data quality issue: "
527 "minimum FAP is {}. ").format(
528 app.conf['idq_fap_thresh'],
529 json.dumps(idq_faps_readable)[1:-1])
530 # If iDQ FAP is low and pipeline enabled, apply DQV
531 if app.conf['idq_veto'][pipeline]:
532 gracedb.remove_label('DQOK', graceid)
533 gracedb.create_label('DQV', graceid)
534 # Add labels to return value to avoid querying GraceDB again.
535 event = dict(event, labels=event.get('labels', []) + ['DQV'])
536 try:
537 event['labels'].remove('DQOK')
538 except ValueError: # not in list
539 pass
540 else:
541 idq_msg = ("iDQ false alarm probabilities for active detectors "
542 "are good (above {} threshold). "
543 "Minimum FAP is {}. ").format(
544 app.conf['idq_fap_thresh'],
545 json.dumps(idq_faps_readable)[1:-1])
546 elif None in idq_faps.values():
547 idq_msg = "iDQ false alarm probabilities unknown. "
548 else:
549 idq_msg = ''
550 gracedb.upload.delay(
551 None, None, graceid,
552 idq_ok_msg + idq_msg + prepost_msg, ['data_quality'])
554 # Labeling INJ to GraceDB
555 if False in active_inj_states.values():
556 # Label 'INJ' if injection found in active IFOs
557 gracedb.create_label('INJ', graceid)
558 # Add labels to return value to avoid querying GraceDB again.
559 event = dict(event, labels=event.get('labels', []) + ['INJ'])
560 if False in inj_states.values():
561 # Write all found injections into GraceDB log
562 injs = [k for k, v in inj_states.items() if v is False]
563 inj_fmt = "Injection found.\n{}\n"
564 inj_msg = inj_fmt.format(
565 generate_table('Injection bits', [], injs, []))
566 elif all(inj_states.values()) and len(inj_states.values()) > 0:
567 inj_msg = 'No HW injections found. '
568 gracedb.remove_label('INJ', graceid)
569 event = dict(event, labels=list(event.get('labels', [])))
570 try:
571 event['labels'].remove('INJ')
572 except ValueError: # not in list
573 pass
574 else:
575 inj_msg = 'Injection state unknown. '
576 gracedb.upload.delay(
577 None, None, graceid, inj_msg + prepost_msg, ['data_quality'])
579 # Determining overall_dq_active_state
580 if None in active_dq_states.values() or len(
581 active_dq_states.values()) == 0:
582 overall_dq_active_state = None
583 elif False in active_dq_states.values():
584 overall_dq_active_state = False
585 elif all(active_dq_states.values()):
586 overall_dq_active_state = True
587 goods = [k for k, v in dq_states.items() if v is True]
588 bads = [k for k, v in dq_states.items() if v is False]
589 unknowns = [k for k, v in dq_states.items() if v is None]
590 fmt = "Detector state for active instruments is {}.\n{}"
591 msg = fmt.format(
592 {None: 'unknown', False: 'bad', True: 'good'}[overall_dq_active_state],
593 generate_table('Data quality bits', goods, bads, unknowns)
594 )
595 if app.conf['uses_gatedhoft'][pipeline]:
596 gate_msg = ('Pipeline {} uses gated h(t),'
597 ' LIGO DMT-DQ_VECTOR not checked.').format(pipeline)
598 else:
599 gate_msg = ''
601 # Labeling DQOK/DQV to GraceDB
602 gracedb.upload.delay(
603 None, None, graceid, msg + prepost_msg + gate_msg, ['data_quality'])
604 if overall_dq_active_state is True:
605 state = "pass"
606 gracedb.remove_label('DQV', graceid)
607 gracedb.create_label('DQOK', graceid)
608 # Add labels to return value to avoid querying GraceDB again.
609 event = dict(event, labels=event.get('labels', []) + ['DQOK'])
610 try:
611 event['labels'].remove('DQV')
612 except ValueError: # not in list
613 pass
614 elif overall_dq_active_state is False:
615 state = "fail"
616 gracedb.remove_label('DQOK', graceid)
617 gracedb.create_label('DQV', graceid)
618 # Add labels to return value to avoid querying GraceDB again.
619 event = dict(event, labels=event.get('labels', []) + ['DQV'])
620 try:
621 event['labels'].remove('DQOK')
622 except ValueError: # not in list
623 pass
624 else:
625 state = "unknown"
627 # Create and upload DQR-compatible json
628 state_summary = '{} {} {}'.format(
629 inj_msg, idq_ok_msg + idq_msg, msg)
630 if state == "unknown":
631 json_state = "error"
632 else:
633 json_state = state
634 file = dqr_json(json_state, state_summary)
635 filename = 'gwcelerydetcharcheckvectors-{}.json'.format(graceid)
636 message = "DQR-compatible json generated from check_vectors results"
637 gracedb.upload.delay(
638 json.dumps(file), filename, graceid, message)
640 return event