Source code for ligo.em_bright.em_bright

"""Module containing tools for EM-Bright classification of
compact binaries using trained supervised classifier
"""
import h5py
import numpy as np
from scipy.interpolate import interp1d
from astropy import cosmology, units as u

from . import computeDiskMass, utils
from .data import EOS_BAYES_FACTORS, PACKAGE_FILENAMES

ALL_EOS_DRAWS = utils.load_eos_posterior()

_classifiers = {
    eos: utils._open_and_return_clfs(
        PACKAGE_FILENAMES[f'{eos}.pickle']
    )
    for eos in EOS_BAYES_FACTORS
}
"""Classifiers keyed on EOS names. Order (clf_ns, clf_em)"""
assert set(_classifiers) == set(EOS_BAYES_FACTORS), "Inconsistency in"
" number of trained classifiers."

_massgap_classifier = utils._open_and_return_clfs(
        PACKAGE_FILENAMES['MASS_GAP.pickle']
    )

_gstlal_ssm_classifier = utils._open_and_return_clfs(
        PACKAGE_FILENAMES['GSTLAL_SSM.pickle']
    )

_mbta_ssm_classifier = utils._open_and_return_clfs(
        PACKAGE_FILENAMES['MBTA_SSM.pickle']
    )


[docs] def mchirp(mass_1, mass_2): return (mass_1 * mass_2)**(3./5.)/(mass_1 + mass_2)**(1./5.)
[docs] def q(mass_1, mass_2): return mass_2/mass_1 if mass_2 < mass_1 else mass_1/mass_2
[docs] def source_classification(mass_1, mass_2, chi1, chi2, snr, ns_classifier=None, emb_classifier=None, massgap_classifier=None): """ Computes ``HasNS``, ``HasRemnant``, and ``MassGap`` probabilities from point mass, spin and signal to noise ratio estimates. Parameters ---------- mass_1 : float primary mass mass_2 : float secondary mass chi1 : float dimensionless primary spin chi2 : float dimensionless secondary spin snr : float signal to noise ratio of the signal ns_classifier : object, optional pickled object for NS classification emb_classifier : object, optional pickled object for EM brightness classification massgap_classifier : object, optional pickled object for EM brightness classification Returns ------- tuple (HasNS, HasRemnant, HasMassGap) predicted values. Notes ----- By default the classifiers, trained based on different nuclear equations of state (EoSs) are downloaded from the project page: https://git.ligo.org/emfollow/em-properties/em-bright. The methodology is described in arXiv:1911.00116. The score from each classifier is weighted based on the bayes factors of individual EoSs as mentioned in Table I of arXiv:2104.08681. However, if the trained classifiers are supplied via ``ns_classifier`` and ``emb_classifier``, the score is reported based on the classifier instead of re-weighting the score. Examples -------- >>> from ligo.em_bright import em_bright >>> em_bright.source_classification(2.0 ,1.0 ,0. ,0. ,10.0) (1.0, 1.0, 0.0) """ if mass_1 >= mass_2: features = [[mass_1, mass_2, chi1, chi2, snr]] elif mass_1 < mass_2: features = [[mass_2, mass_1, chi2, chi1, snr]] try: # custom classifiers supplied return ( ns_classifier.predict_proba(features).T[1][0], emb_classifier.predict_proba(features).T[1][0], massgap_classifier.predict_proba(features).T[1][0] ) except AttributeError as e: msg, *_ = e.args if msg != """'NoneType' object has no attribute 'predict_proba'""": raise reweighted_ns_score = reweighted_emb_score = 0. for eosname, bayes_factor in EOS_BAYES_FACTORS.items(): ns_classifier, emb_classifier = _classifiers[eosname] reweighted_ns_score += ns_classifier.predict_proba( features).T[1][0] * bayes_factor reweighted_emb_score += emb_classifier.predict_proba( features).T[1][0] * bayes_factor massgap_score = _massgap_classifier.predict_proba(features).T[1][0] return reweighted_ns_score, reweighted_emb_score, massgap_score
[docs] def source_classification_ssm(mass_1, mass_2, chi1, chi2, mc, snr, pipeline, ssm_classifier=None): """ Computes ``HasSSM``, ``HasNS``, ``HasMassGap`` probabilities from point mass, spin, chirp mass and signal to noise ratio estimates for SSM search. Parameters ---------- mass_1 : float primary mass mass_2 : float secondary mass chi1 : float dimensionless primary spin chi2 : float dimensionless secondary spin mc : float chirp mass snr : float signal to noise ratio of the signal pipeline : string search pipeline ssm_classifier : object, optional pickled object for gstlal ssm classification Returns ------- tuple (HasSSM, HasNS, HasMassGap) predicted values. -------- >>> from ligo.em_bright import em_bright >>> em_bright.source_classification_ssm(2.0 ,1.0 ,0. ,0. ,10.0, "gstlal") (1.0, 1.0, 0.0) """ if mass_1 >= mass_2: features = [[mass_1, mass_2, chi1, chi2, mc, snr]] elif mass_1 < mass_2: features = [[mass_2, mass_1, chi2, chi1, mc, snr]] try: # custom classifiers supplied predict = ssm_classifier.predict_proba(features).T has_SSM = predict[0][0] + predict[1][0] + predict[2][0] + predict[3][0] has_NS = predict[1][0] + predict[4][0] + predict[5][0] + predict[6][0] has_MassGap = predict[2][0] + predict[5][0] return has_SSM, has_NS, has_MassGap except AttributeError as e: msg, *_ = e.args if msg != """'NoneType' object has no attribute 'predict_proba'""": raise if pipeline == 'gstlal': clf = _gstlal_ssm_classifier elif pipeline == "mbta": clf = _mbta_ssm_classifier predict = clf.predict_proba(features).T has_SSM_score = predict[0][0] + predict[1][0] + \ predict[2][0] + predict[3][0] has_NS_score = predict[1][0] + predict[4][0] + \ predict[5][0] + predict[6][0] has_MassGap_score = predict[2][0] + predict[5][0] return has_SSM_score, has_NS_score, has_MassGap_score
[docs] def get_redshifts(distances, N=10000): """ Compute redshift using the Planck15 cosmology. Parameters ---------- distances: float or numpy.ndarray distance(s) in Mpc N : int, optional Number of steps for the computation of the interpolation function Example ------- >>> distances = np.linspace(10, 100, 10) >>> em_bright.get_redshifts(distances) array([0.00225566, 0.00450357, 0.00674384, 0.00897655, 0.01120181, 0.0134197 , 0.01563032, 0.01783375 0.02003009, 0.02221941]) Notes ----- This function accepts HDF5 posterior samples file and computes redshift by interpolating the distance-redshift relation. """ function = cosmology.Planck15.luminosity_distance min_dist = np.min(distances) max_dist = np.max(distances) z_min = cosmology.z_at_value(func=function, fval=min_dist*u.Mpc) z_max = cosmology.z_at_value(func=function, fval=max_dist*u.Mpc) z_steps = np.linspace(z_min - (0.1*z_min), z_max + (0.1*z_max), N) lum_dists = cosmology.Planck15.luminosity_distance(z_steps) s = interp1d(lum_dists, z_steps) redshifts = s(distances) return redshifts
[docs] def source_classification_pe(posterior_samples_file, **kwargs): """ Compute ``HasNS``, ``HasRemnant``, and ``HasMassGap`` probabilities from posterior samples file. Parameters ---------- posterior_samples_file : str Posterior samples file num_eos_draws : int providing an int here runs eos marginalization with the value determining how many eos's to draw eos_seed : int seed for random eos draws eosname : str Equation of state name, inferred from ``lalsimulation``. Supersedes eos marginalization method when provided. Returns ------- tuple (HasSSM, HasNS, HasRemnant, HasMassGap) predicted values. Examples -------- >>> from ligo.em_bright import em_bright >>> em_bright.source_classification_pe('posterior_samples.hdf5') (1.0, 0.96, 0.0) """ with h5py.File(posterior_samples_file, 'r') as data: samples = data['posterior_samples'][()] return source_classification_pe_from_table(samples, **kwargs)
[docs] def source_classification_pe_from_table(table, **kwargs): """ Compute ``HasNS``, ``HasRemnant``, and ``HasMassGap`` probabilities from posterior table Parameters ---------- table : numpy.recarray, dict table containing the posterior samples num_eos_draws : int providing an int here runs eos marginalization with the value determining how many eos's to draw eos_seed : int seed for random eos draws eosname : str Equation of state name, inferred from ``lalsimulation``. Supersedes eos marginalization method when provided. Returns ------- tuple (HasSSM, HasNS, HasRemnant, HasMassGap) predicted values. """ try: mass_1, mass_2 = table['mass_1_source'], table['mass_2_source'] except (ValueError, KeyError): lum_dist = table['luminosity_distance'] redshifts = get_redshifts(lum_dist) try: mass_1, mass_2 = table['mass_1'], table['mass_2'] mass_1, mass_2 = mass_1/(1 + redshifts), mass_2/(1 + redshifts) except (ValueError, KeyError): chirp_mass, mass_ratio = table['chirp_mass'], table['mass_ratio'] # noqa:E501 chirp_mass = chirp_mass/(1 + redshifts) mass_1 = chirp_mass * (1 + mass_ratio)**(1/5) * (mass_ratio)**(-3/5) # noqa:E501 mass_2 = chirp_mass * (1 + mass_ratio)**(1/5) * (mass_ratio)**(2/5) try: a_1 = table["spin_1z"] a_2 = table["spin_2z"] except (ValueError, KeyError): try: a_1 = table['a_1'] * np.cos(table['tilt_1']) a_2 = table['a_2'] * np.cos(table['tilt_2']) except (ValueError, KeyError): a_1, a_2 = np.zeros(len(mass_1)), np.zeros(len(mass_2)) return source_classification_pe_from_samples(mass_1, mass_2, a_1, a_2, **kwargs)
[docs] def source_classification_pe_from_samples(mass_1_source, mass_2_source, spin_1z, spin_2z, eosname=None, num_eos_draws=10000, eos_seed=None): """ Compute ``HasNS``, ``HasRemnant``, and ``HasMassGap`` probabilities from samples. Parameters ---------- mass_1_source : np.ndarray Samples for the source mass of the primary object mass_2_source : np.ndarray Samples for the source mass of the secondary object spin_1z : np.ndarray Samples for the spin component aligned with the orbital angular momentum for the primary object spin_2z : np.ndarray Samples for the spin component aligned with the orbital angular momentum for the secondary object num_eos_draws : int, optional providing an int here runs eos marginalization with the value determining how many eos's to draw eos_seed : int, optional seed for random eos draws eosname : str, optional Equation of state name, inferred from ``lalsimulation``. Supersedes eos marginalization method when provided. Returns ------- tuple (HasNS, HasRemnant, HasMassGap) predicted values. """ # separating non-ssm samples for diskmass computation nossm_condition = (mass_1_source >= 1) & (mass_2_source >= 1) mass_1_source_nossm = mass_1_source[nossm_condition] mass_2_source_nossm = mass_2_source[nossm_condition] spin_1z_nossm = spin_1z[nossm_condition] spin_2z_nossm = spin_2z[nossm_condition] if eosname: M_rem = computeDiskMass.computeDiskMass(mass_1_source_nossm, mass_2_source_nossm, spin_1z_nossm, spin_2z_nossm, eosname=eosname) max_mass = computeDiskMass.max_mass_from_eosname(eosname) ssm_ns_condition = ((mass_1_source >= 1) & (mass_2_source <= 1) & (mass_1_source <= max_mass)) ssm_ns_counts = ssm_ns_condition.sum() m1_ns_condition = ((mass_1_source <= max_mass) & (mass_1_source >= 1)) m2_ns_condition = ((mass_2_source <= max_mass) & (mass_2_source >= 1)) ns_condition = (m1_ns_condition | m2_ns_condition) prediction_ns = np.sum(ns_condition)/len(mass_2_source) # SSM-NS components are considered to produced hasRemnant prediction_em = (np.sum(M_rem > 0) + ssm_ns_counts)/len(mass_2_source) else: np.random.seed(eos_seed) prediction_nss, prediction_ems = [], [] # EoS draws from: 10.5281/zenodo.6502467 rand_subset = np.random.choice( len(ALL_EOS_DRAWS), num_eos_draws if num_eos_draws < len(ALL_EOS_DRAWS) else len(ALL_EOS_DRAWS), replace=False) # noqa:E501 subset_draws = ALL_EOS_DRAWS[rand_subset] # convert radius to m from km M, R = subset_draws['M'], 1000*subset_draws['R'] max_masses = np.max(M, axis=1) f_M = [interp1d(m, r, bounds_error=False) for m, r in zip(M, R)] for mass_radius_relation, max_mass in zip(f_M, max_masses): M_rem = computeDiskMass.computeDiskMass(mass_1_source_nossm, mass_2_source_nossm, spin_1z_nossm, spin_2z_nossm, eosname=mass_radius_relation, # noqa:E501 max_mass=max_mass) ssm_ns_condition = ((mass_1_source >= 1) & (mass_2_source <= 1) & (mass_1_source <= max_mass)) ssm_ns_counts = ssm_ns_condition.sum() m1_ns_condition = ((mass_1_source <= max_mass) & (mass_1_source >= 1)) m2_ns_condition = ((mass_2_source <= max_mass) & (mass_2_source >= 1)) ns_condition = (m1_ns_condition | m2_ns_condition) prediction_nss.append(np.sum( ns_condition) / len(mass_2_source)) prediction_ems.append((np.sum(M_rem > 0) + ssm_ns_counts) / len(mass_2_source)) prediction_ns = np.mean(prediction_nss) prediction_em = np.mean(prediction_ems) prediction_mg = (mass_1_source <= 5) & (mass_1_source >= 3) prediction_mg += (mass_2_source <= 5) & (mass_2_source >= 3) prediction_mg = np.sum(prediction_mg)/len(mass_1_source) prediction_ssm = (mass_2_source <= 1) prediction_ssm = np.sum(prediction_ssm)/len(mass_1_source) return prediction_ssm, prediction_ns, prediction_em, prediction_mg