diff --git a/gwpopulation/__init__.py b/gwpopulation/__init__.py index c7205e44..0e01b205 100644 --- a/gwpopulation/__init__.py +++ b/gwpopulation/__init__.py @@ -14,7 +14,8 @@ The code is hosted at ``_. """ -from . import conversions, cupy_utils, hyperpe, models, utils, vt +from . import conversions, hyperpe, models, utils, vt +from .backend import SUPPORTED_BACKENDS, disable_cupy, enable_cupy, set_backend from .hyperpe import RateLikelihood try: @@ -22,31 +23,7 @@ except ModuleNotFoundError: # development mode __version__ = "unknown" - -__all_with_xp = [ - models.mass, - models.redshift, - models.spin, - cupy_utils, - hyperpe, - utils, - vt, -] - - -def disable_cupy(): - import numpy as np - - for module in __all_with_xp: - module.xp = np - - -def enable_cupy(): - try: - import cupy as cp - except ImportError: - import numpy as cp - - print("Cannot import cupy, falling back to numpy.") - for module in __all_with_xp: - module.xp = cp +try: + set_backend("cupy") +except ModuleNotFoundError: + set_backend("numpy") diff --git a/gwpopulation/backend.py b/gwpopulation/backend.py new file mode 100644 index 00000000..765b1ef6 --- /dev/null +++ b/gwpopulation/backend.py @@ -0,0 +1,56 @@ +__all_with_xp = [ + "hyperpe", + "models.interped", + "models.mass", + "models.redshift", + "models.spin", + "utils", + "vt", +] +__all_with_scs = ["models.mass", "utils"] +__backend__ = "" +SUPPORTED_BACKENDS = ["numpy", "cupy"] +_scipy_module = dict(numpy="scipy", cupy="cupyx.scipy") + + +def disable_cupy(): + from warnings import warn + + warn( + f"Function enable_cupy is deprecated, use set_backed('cupy') instead", + DeprecationWarning, + ) + set_backend(backend="numpy") + + +def enable_cupy(): + from warnings import warn + + warn( + f"Function enable_cupy is deprecated, use set_backed('cupy') instead", + DeprecationWarning, + ) + set_backend(backend="cupy") + + +def set_backend(backend="numpy"): + global __backend__ + if backend not in SUPPORTED_BACKENDS: + raise ValueError( + f"Backend {backend} not supported, should be in {', '.join(SUPPORTED_BACKENDS)}" + ) + elif backend == __backend__: + return + + from importlib import import_module + + try: + xp = import_module(backend) + scs = import_module(_scipy_module[backend]).special + except ModuleNotFoundError: + raise ModuleNotFoundError(f"{backend} not installed") + for module in __all_with_xp: + __backend__ = backend + import_module(f".{module}", package="gwpopulation").xp = xp + for module in __all_with_scs: + import_module(f".{module}", package="gwpopulation").scs = scs diff --git a/gwpopulation/cupy_utils.py b/gwpopulation/cupy_utils.py deleted file mode 100644 index c20f059e..00000000 --- a/gwpopulation/cupy_utils.py +++ /dev/null @@ -1,119 +0,0 @@ -""" -Helper functions for missing functionality in cupy. -""" - -try: - import cupy as xp - from cupyx.scipy.special import erf, gammaln, i0e # noqa - - CUPY_LOADED = True -except ImportError: - import numpy as xp - from scipy.special import erf, gammaln, i0e # noqa - - CUPY_LOADED = False - - -def betaln(alpha, beta): - r""" - Logarithm of the Beta function - - .. math:: - \ln B(\alpha, \beta) = \frac{\ln\gamma(\alpha)\ln\gamma(\beta)}{\ln\gamma(\alpha + \beta)} - - Parameters - ---------- - alpha: float - The Beta alpha parameter (:math:`\alpha`) - beta: float - The Beta beta parameter (:math:`\beta`) - - Returns - ------- - ln_beta: float, array-like - The ln Beta function - - """ - ln_beta = gammaln(alpha) + gammaln(beta) - gammaln(alpha + beta) - return ln_beta - - -def to_numpy(array): - """Cast any array to numpy""" - if not CUPY_LOADED: - return array - else: - return xp.asnumpy(array) - - -def trapz(y, x=None, dx=1.0, axis=-1): - """ - Lifted from `numpy `_. - - Integrate along the given axis using the composite trapezoidal rule. - Integrate `y` (`x`) along given axis. - - Parameters - ========== - y : array_like - Input array to integrate. - x : array_like, optional - The sample points corresponding to the `y` values. If `x` is None, - the sample points are assumed to be evenly spaced `dx` apart. The - default is None. - dx : scalar, optional - The spacing between sample points when `x` is None. The default is 1. - axis : int, optional - The axis along which to integrate. - - Returns - ======= - trapz : float - Definite integral as approximated by trapezoidal rule. - - - References - ========== - .. [1] Wikipedia page: http://en.wikipedia.org/wiki/Trapezoidal_rule - - Examples - ======== - >>> trapz([1,2,3]) - 4.0 - >>> trapz([1,2,3], x=[4,6,8]) - 8.0 - >>> trapz([1,2,3], dx=2) - 8.0 - >>> a = xp.arange(6).reshape(2, 3) - >>> a - array([[0, 1, 2], - [3, 4, 5]]) - >>> trapz(a, axis=0) - array([ 1.5, 2.5, 3.5]) - >>> trapz(a, axis=1) - array([ 2., 8.]) - """ - y = xp.asanyarray(y) - if x is None: - d = dx - else: - x = xp.asanyarray(x) - if x.ndim == 1: - d = xp.diff(x) - # reshape to correct shape - shape = [1] * y.ndim - shape[axis] = d.shape[0] - d = d.reshape(shape) - else: - d = xp.diff(x, axis=axis) - ndim = y.ndim - slice1 = [slice(None)] * ndim - slice2 = [slice(None)] * ndim - slice1[axis] = slice(1, None) - slice2[axis] = slice(None, -1) - product = d * (y[tuple(slice1)] + y[tuple(slice2)]) / 2.0 - try: - ret = product.sum(axis) - except ValueError: - ret = xp.add.reduce(product, axis) - return ret diff --git a/gwpopulation/hyperpe.py b/gwpopulation/hyperpe.py index f9c9bf4b..33f9758e 100644 --- a/gwpopulation/hyperpe.py +++ b/gwpopulation/hyperpe.py @@ -9,8 +9,9 @@ from bilby.core.utils import logger from bilby.hyper.model import Model -from .cupy_utils import CUPY_LOADED, to_numpy, xp -from .utils import get_name +from .utils import get_name, to_numpy + +xp = np class HyperparameterLikelihood(Likelihood): @@ -67,8 +68,13 @@ def __init__( If the uncertainty is larger than this value a log likelihood of -inf will be returned. Default = inf """ - if cupy and not CUPY_LOADED: - logger.warning("Cannot import cupy, falling back to numpy.") + if cupy: + from .backend import set_backend + + try: + set_backend("cupy") + except ImportError: + logger.warning(f"Cupy not available, using {xp.__name__}.") self.samples_per_posterior = max_samples self.data = self.resample_posteriors(posteriors, max_samples=max_samples) diff --git a/gwpopulation/models/interped.py b/gwpopulation/models/interped.py index aa37107e..824e855a 100644 --- a/gwpopulation/models/interped.py +++ b/gwpopulation/models/interped.py @@ -1,6 +1,6 @@ import numpy as np -from ..cupy_utils import trapz, xp +xp = np class InterpolatedNoBaseModelIdentical(object): @@ -70,7 +70,7 @@ def norm_p_x(self, f_splines=None, x_splines=None, **kwargs): perturbation = self._norm_spline(x=self._xs[self.norm_selector], y=f_splines) p_x = xp.zeros(len(self._xs)) p_x[self.norm_selector] = xp.exp(perturbation) - norm = trapz(p_x, self._xs) + norm = xp.trapz(p_x, self._xs) return norm def p_x_identical(self, dataset, **kwargs): diff --git a/gwpopulation/models/mass.py b/gwpopulation/models/mass.py index 2988f609..043cd385 100644 --- a/gwpopulation/models/mass.py +++ b/gwpopulation/models/mass.py @@ -3,9 +3,13 @@ """ import inspect -from ..cupy_utils import trapz, xp +import numpy as np +import scipy.special as scs + from ..utils import powerlaw, truncnorm +xp = np + def double_power_law_primary_mass(mass, alpha_1, alpha_2, mmin, mmax, break_fraction): r""" @@ -555,7 +559,7 @@ def norm_p_m1(self, delta_m, **kwargs): p_m = self.__class__.primary_model(self.m1s, **kwargs) p_m *= self.smoothing(self.m1s, mmin=mmin, mmax=self.mmax, delta_m=delta_m) - norm = trapz(p_m, self.m1s) + norm = xp.trapz(p_m, self.m1s) return norm def p_q(self, dataset, beta, mmin, delta_m): @@ -582,29 +586,25 @@ def norm_p_q(self, beta, mmin, delta_m): p_q *= self.smoothing( self.m1s_grid * self.qs_grid, mmin=mmin, mmax=self.m1s_grid, delta_m=delta_m ) - norms = trapz(p_q, self.qs, axis=0) - - all_norms = ( - norms[self.n_below] * (1 - self.step) + norms[self.n_above] * self.step - ) + norms = xp.nan_to_num(xp.trapz(p_q, self.qs, axis=0)) - return all_norms + return self._q_interpolant(norms) def _cache_q_norms(self, masses): """ Cache the information necessary for linear interpolation of the mass ratio normalisation """ - self.n_below = xp.zeros_like(masses, dtype=int) - 1 - m_below = xp.zeros_like(masses) - for mm in self.m1s: - self.n_below += masses > mm - m_below[masses > mm] = mm - self.n_above = self.n_below + 1 - max_idx = len(self.m1s) - self.n_below[self.n_below < 0] = 0 - self.n_above[self.n_above == max_idx] = max_idx - 1 - self.step = xp.minimum((masses - m_below) / self.dm, 1) + from functools import partial + + from cached_interpolate import RegularCachingInterpolant as CachingInterpolant + + from ..utils import to_numpy + + nodes = to_numpy(self.m1s) + interpolant = CachingInterpolant(nodes, nodes, kind="cubic", backend=xp) + interpolant.conversion = xp.asarray(interpolant.conversion) + self._q_interpolant = partial(interpolant, xp.asarray(masses)) @staticmethod def smoothing(masses, mmin, mmax, delta_m): @@ -623,17 +623,14 @@ def smoothing(masses, mmin, mmax, delta_m): See also, https://en.wikipedia.org/wiki/Window_function#Planck-taper_window """ - window = xp.ones_like(masses) if delta_m > 0.0: - smoothing_region = (masses >= mmin) & (masses < (mmin + delta_m)) - shifted_mass = masses[smoothing_region] - mmin - if shifted_mass.size: - exponent = xp.nan_to_num( - delta_m / shifted_mass + delta_m / (shifted_mass - delta_m) - ) - window[smoothing_region] = 1 / (xp.exp(exponent) + 1) - window[(masses < mmin) | (masses > mmax)] = 0 - return window + shifted_mass = xp.clip((masses - mmin) / delta_m, 1e-6, 1 - 1e-6) + exponent = 1 / shifted_mass - 1 / (1 - shifted_mass) + window = scs.expit(-exponent) + window *= (masses >= mmin) * (masses <= mmax) + return window + else: + return xp.ones(masses.shape) class SinglePeakSmoothedMassDistribution(BaseSmoothedMassDistribution): diff --git a/gwpopulation/models/redshift.py b/gwpopulation/models/redshift.py index 50ffd5f5..df640d4f 100644 --- a/gwpopulation/models/redshift.py +++ b/gwpopulation/models/redshift.py @@ -4,7 +4,9 @@ import numpy as np -from ..cupy_utils import to_numpy, trapz, xp +from ..utils import to_numpy + +xp = np class _Redshift(object): @@ -49,7 +51,7 @@ def normalisation(self, parameters): (float, array-like): Total spacetime volume """ psi_of_z = self.psi_of_z(redshift=self.zs, **parameters) - norm = trapz(psi_of_z * self.dvc_dz / (1 + self.zs), self.zs) + norm = xp.trapz(psi_of_z * self.dvc_dz / (1 + self.zs), self.zs) return norm def probability(self, dataset, **parameters): diff --git a/gwpopulation/models/spin.py b/gwpopulation/models/spin.py index ce1e0b32..bbf136af 100644 --- a/gwpopulation/models/spin.py +++ b/gwpopulation/models/spin.py @@ -1,8 +1,8 @@ """ Implemented spin models """ +import numpy as xp -from ..cupy_utils import xp from ..utils import beta_dist, truncnorm, unnormalized_2d_gaussian from .interped import InterpolatedNoBaseModelIdentical diff --git a/gwpopulation/utils.py b/gwpopulation/utils.py index b94ccbdc..1965a913 100644 --- a/gwpopulation/utils.py +++ b/gwpopulation/utils.py @@ -1,10 +1,12 @@ """ Helper functions for probability distributions. """ +from numbers import Number -import os +import numpy as np +from scipy import special as scs -from .cupy_utils import betaln, erf, i0e, xp +xp = np def beta_dist(xx, alpha, beta, scale=1): @@ -36,7 +38,7 @@ def beta_dist(xx, alpha, beta, scale=1): if beta < 0: raise ValueError(f"Parameter beta must be greater or equal zero, low={beta}.") ln_beta = (alpha - 1) * xp.log(xx) + (beta - 1) * xp.log(scale - xx) - ln_beta -= betaln(alpha, beta) + ln_beta -= scs.betaln(alpha, beta) ln_beta -= (alpha + beta - 1) * xp.log(scale) prob = xp.exp(ln_beta) prob = xp.nan_to_num(prob) @@ -112,7 +114,9 @@ def truncnorm(xx, mu, sigma, high, low): if sigma <= 0: raise ValueError(f"Sigma must be greater than 0, sigma={sigma}") norm = 2**0.5 / xp.pi**0.5 / sigma - norm /= erf((high - mu) / 2**0.5 / sigma) + erf((mu - low) / 2**0.5 / sigma) + norm /= scs.erf((high - mu) / 2**0.5 / sigma) + scs.erf( + (mu - low) / 2**0.5 / sigma + ) prob = xp.exp(-xp.power(xx - mu, 2) / (2 * sigma**2)) prob *= norm prob *= (xx <= high) & (xx >= low) @@ -191,7 +195,7 @@ def von_mises(xx, mu, kappa): For numerical stability, the factor of `exp(kappa)` from using `i0e` is accounted for in the numerator """ - return xp.exp(kappa * (xp.cos(xx - mu) - 1)) / (2 * xp.pi * i0e(kappa)) + return xp.exp(kappa * (xp.cos(xx - mu) - 1)) / (2 * xp.pi * scs.i0e(kappa)) def get_version_information(): @@ -218,3 +222,23 @@ def get_name(input): return input.__name__ else: return input.__class__.__name__ + + +def to_numpy(array): + """ + Convert an array to a numpy array. + Numeric types and pandas objects are returned unchanged. + + Parameters + ========== + array: array-like + The array to convert. + """ + if isinstance(array, (Number, np.ndarray)): + return array + elif "cupy" in array.__class__.__module__: + return xp.asnumpy(array) + elif "pandas" in array.__class__.__module__: + return array + else: + raise TypeError(f"Cannot convert {type(array)} to numpy array") diff --git a/gwpopulation/vt.py b/gwpopulation/vt.py index 0bf184e1..4e0945bf 100644 --- a/gwpopulation/vt.py +++ b/gwpopulation/vt.py @@ -5,9 +5,10 @@ import numpy as np from bilby.hyper.model import Model -from .cupy_utils import trapz, xp from .models.redshift import _Redshift, total_four_volume +xp = np + class _BaseVT(object): def __init__(self, model, data): @@ -40,14 +41,16 @@ def __init__(self, model, data): self.values = {key: xp.unique(self.data[key]) for key in self.data} shape = np.array(list(self.data.values())[0].shape) lens = {key: len(self.values[key]) for key in self.data} - self.axes = {int(np.where(shape == lens[key])[0]): key for key in self.data} + self.axes = {int(np.where(shape == lens[key])[0][0]): key for key in self.data} self.ndim = len(self.axes) def __call__(self, parameters): self.model.parameters.update(parameters) vt_fac = self.model.prob(self.data) * self.vts for ii in range(self.ndim): - vt_fac = trapz(vt_fac, self.values[self.axes[self.ndim - ii - 1]], axis=-1) + vt_fac = xp.trapz( + vt_fac, self.values[self.axes[self.ndim - ii - 1]], axis=-1 + ) return vt_fac diff --git a/test/__init__.py b/test/__init__.py index e69de29b..050c8bae 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -0,0 +1,11 @@ +import importlib + +from gwpopulation import backend as _backend + +TEST_BACKENDS = list() +for backend in _backend.SUPPORTED_BACKENDS: + try: + importlib.import_module(backend) + except ImportError: + continue + TEST_BACKENDS.append(backend) diff --git a/test/backend_test.py b/test/backend_test.py new file mode 100644 index 00000000..33bc6e23 --- /dev/null +++ b/test/backend_test.py @@ -0,0 +1,29 @@ +import numpy +import pytest + +import gwpopulation + + +def test_unsupported_backend_raises_value_error(): + with pytest.raises(ValueError): + gwpopulation.set_backend("fail") + + +def test_set_backend_numpy(): + gwpopulation.set_backend("numpy") + from gwpopulation.utils import xp + + assert xp == numpy + + +def test_enable_cupy_deprecated(): + with pytest.deprecated_call(): + try: + gwpopulation.enable_cupy() + except ModuleNotFoundError: + pass + + +def test_disable_cupy_deprecated(): + with pytest.deprecated_call(): + gwpopulation.disable_cupy() diff --git a/test/conversion_test.py b/test/conversion_test.py index 66959b56..6812c5e1 100644 --- a/test/conversion_test.py +++ b/test/conversion_test.py @@ -1,66 +1,63 @@ -import unittest - from gwpopulation.conversions import * - -class TestBetaConversion(unittest.TestCase): - def setUp(self): - self.known_values = dict( - alpha=[1, 2, 2], - beta=[1, 4, 4], - amax=[1, 1, 0.5], - mu=[1 / 2, 1 / 3, 1 / 6], - var=[1 / 12, 2 / 63, 1 / 126], +known_values = dict( + alpha=[1, 2, 2], + beta=[1, 4, 4], + amax=[1, 1, 0.5], + mu=[1 / 2, 1 / 3, 1 / 6], + var=[1 / 12, 2 / 63, 1 / 126], +) +suffices = ["", "_1", "_2"] + + +def test_mu_chi_var_chi_max_to_alpha_beta_max(): + for ii in range(3): + mu = known_values["mu"][ii] + var = known_values["var"][ii] + amax = known_values["amax"][ii] + alpha, beta, _ = mu_var_max_to_alpha_beta_max(mu, var, amax) + assert abs(alpha - known_values["alpha"][ii]) < 1e-6 + assert abs(beta == known_values["beta"][ii]) < 1e-6 + assert amax == known_values["amax"][ii] + + +def test_alpha_beta_max_to_mu_chi_var_chi_max(): + for ii in range(3): + alpha = known_values["alpha"][ii] + beta = known_values["beta"][ii] + amax = known_values["amax"][ii] + mu, var, _ = alpha_beta_max_to_mu_var_max(alpha, beta, amax) + assert mu == known_values["mu"][ii] + assert var == known_values["var"][ii] + + +def test_convert_to_beta_parameters(): + for ii, suffix in enumerate(suffices): + params = dict() + params["mu_chi" + suffix] = known_values["mu"][ii] + params["sigma_chi" + suffix] = known_values["var"][ii] + params["amax" + suffix] = known_values["amax"][ii] + new_params, _ = convert_to_beta_parameters(params, remove=True) + full_dict = params.copy() + alpha, beta, _ = mu_var_max_to_alpha_beta_max( + mu=params["mu_chi" + suffix], + var=params["sigma_chi" + suffix], + amax=params["amax" + suffix], ) - self.suffices = ["", "_1", "_2"] - - def tearDown(self): - pass - - def test_mu_chi_var_chi_max_to_alpha_beta_max(self): - for ii in range(3): - mu = self.known_values["mu"][ii] - var = self.known_values["var"][ii] - amax = self.known_values["amax"][ii] - alpha, beta, _ = mu_var_max_to_alpha_beta_max(mu, var, amax) - self.assertAlmostEqual(alpha, self.known_values["alpha"][ii]) - self.assertAlmostEqual(beta, self.known_values["beta"][ii]) - self.assertAlmostEqual(amax, self.known_values["amax"][ii]) + full_dict["alpha_chi" + suffix] = alpha + full_dict["beta_chi" + suffix] = beta + print(new_params, full_dict) + for key in full_dict: + assert new_params[key] == full_dict[key] - def test_alpha_beta_max_to_mu_chi_var_chi_max(self): - for ii in range(3): - alpha = self.known_values["alpha"][ii] - beta = self.known_values["beta"][ii] - amax = self.known_values["amax"][ii] - mu, var, _ = alpha_beta_max_to_mu_var_max(alpha, beta, amax) - self.assertAlmostEqual(mu, self.known_values["mu"][ii]) - self.assertAlmostEqual(var, self.known_values["var"][ii]) - def test_convert_to_beta_parameters(self): - for ii, suffix in enumerate(self.suffices): - params = dict() - params["mu_chi" + suffix] = self.known_values["mu"][ii] - params["sigma_chi" + suffix] = self.known_values["var"][ii] - params["amax" + suffix] = self.known_values["amax"][ii] - new_params, _ = convert_to_beta_parameters(params, remove=True) - full_dict = params.copy() - alpha, beta, _ = mu_var_max_to_alpha_beta_max( - mu=params["mu_chi" + suffix], - var=params["sigma_chi" + suffix], - amax=params["amax" + suffix], - ) - full_dict["alpha_chi" + suffix] = alpha - full_dict["beta_chi" + suffix] = beta - print(new_params, full_dict) - for key in full_dict: - self.assertAlmostEqual(new_params[key], full_dict[key]) +def test_convert_to_beta_parameters_with_none(): + params = dict(amax=1, alpha_chi=None, beta_chi=None, mu_chi=0.5, sigma_chi=0.1) + _, added = convert_to_beta_parameters(params, remove=True) + assert len(added) == 2 - def test_convert_to_beta_parameters_with_none(self): - params = dict(amax=1, alpha_chi=None, beta_chi=None, mu_chi=0.5, sigma_chi=0.1) - new_params, added = convert_to_beta_parameters(params, remove=True) - self.assertTrue(len(added) == 2) - def test_convert_to_beta_parameters_unnecessary(self): - params = dict(amax=1, alpha_chi=1, beta_chi=1) - new_params, added = convert_to_beta_parameters(params, remove=True) - self.assertTrue(len(added) == 0) +def test_convert_to_beta_parameters_unnecessary(): + params = dict(amax=1, alpha_chi=1, beta_chi=1) + _, added = convert_to_beta_parameters(params, remove=True) + assert len(added) == 0 diff --git a/test/example_test.py b/test/example_test.py index f4445c5b..b03083cd 100644 --- a/test/example_test.py +++ b/test/example_test.py @@ -2,11 +2,17 @@ import bilby import pandas as pd +import pytest import gwpopulation +from . import TEST_BACKENDS -def test_likelihood_evaluation(): + +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_likelihood_evaluation(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.models.mass.xp bilby.core.utils.random.seed(10) rng = bilby.core.utils.random.rng @@ -41,12 +47,17 @@ def test_likelihood_evaluation(): pd.DataFrame({key: rng.uniform(*bound, 100) for key, bound in bounds.items()}) for _ in range(10) ] - vt_data = {key: rng.uniform(*bound, 10000) for key, bound in bounds.items()} + vt_data = { + key: xp.asarray(rng.uniform(*bound, 10000)) for key, bound in bounds.items() + } selection = gwpopulation.vt.ResamplingVT(vt_model, vt_data, len(posteriors)) likelihood = gwpopulation.hyperpe.HyperparameterLikelihood( - hyper_prior=model, posteriors=posteriors, selection_function=selection + hyper_prior=model, + posteriors=posteriors, + selection_function=selection, + cupy=backend == "cupy", ) priors = bilby.core.prior.PriorDict("priors/bbh_population.prior") @@ -57,4 +68,4 @@ def test_likelihood_evaluation(): def test_prior_files_load(): for fname in glob.glob("priors/*.prior"): - priors = bilby.core.prior.PriorDict(fname) + _ = bilby.core.prior.PriorDict(fname) diff --git a/test/likelihood_test.py b/test/likelihood_test.py index bedcf14a..a42a8424 100644 --- a/test/likelihood_test.py +++ b/test/likelihood_test.py @@ -2,21 +2,19 @@ import numpy as np import pandas as pd - -try: - import cupy as xp -except ImportError: - xp = np - from bilby.core.prior import PriorDict, Uniform from bilby.hyper.model import Model +import gwpopulation from gwpopulation.hyperpe import HyperparameterLikelihood, RateLikelihood from gwpopulation.models.mass import SinglePeakSmoothedMassDistribution +xp = np + class Likelihoods(unittest.TestCase): def setUp(self): + gwpopulation.set_backend("numpy") self.params = dict(a=1, b=1, c=1) self.model = lambda dataset, a, b, c: dataset["a"] one_data = pd.DataFrame({key: xp.ones(500) for key in self.params}) diff --git a/test/mass_test.py b/test/mass_test.py index 0d5c729d..bee2bb25 100644 --- a/test/mass_test.py +++ b/test/mass_test.py @@ -1,293 +1,340 @@ -import unittest - import numpy as np +import pytest from bilby.core.prior import PriorDict, Uniform -from gwpopulation.cupy_utils import trapz, xp +import gwpopulation from gwpopulation.models import mass +from gwpopulation.utils import to_numpy +from . import TEST_BACKENDS -class TestDoublePowerLaw(unittest.TestCase): - def setUp(self): - self.m1s = np.linspace(3, 100, 1000) - self.qs = np.linspace(0.01, 1, 500) - m1s_grid, qs_grid = xp.meshgrid(self.m1s, self.qs) - self.dataset = dict(mass_1=m1s_grid, mass_ratio=qs_grid) - self.power_prior = PriorDict() - self.power_prior["alpha_1"] = Uniform(minimum=-4, maximum=12) - self.power_prior["alpha_2"] = Uniform(minimum=-4, maximum=12) - self.power_prior["beta"] = Uniform(minimum=-4, maximum=12) - self.power_prior["mmin"] = Uniform(minimum=3, maximum=10) - self.power_prior["mmax"] = Uniform(minimum=40, maximum=100) - self.power_prior["break_fraction"] = Uniform(minimum=40, maximum=100) - self.n_test = 10 - - def test_double_power_law_zero_below_mmin(self): - for ii in range(self.n_test): - parameters = self.power_prior.sample() - del parameters["beta"] - p_m = mass.double_power_law_primary_mass(self.m1s, **parameters) - self.assertEqual(xp.max(p_m[self.m1s <= parameters["mmin"]]), 0.0) - - def test_power_law_primary_mass_ratio_zero_above_mmax(self): - for ii in range(self.n_test): - parameters = self.power_prior.sample() - p_m = mass.double_power_law_primary_power_law_mass_ratio( - self.dataset, **parameters - ) - self.assertEqual( - xp.max(p_m[self.dataset["mass_1"] >= parameters["mmax"]]), 0.0 - ) - - -class TestPrimaryMassRatio(unittest.TestCase): - def setUp(self): - self.m1s = np.linspace(3, 100, 1000) - self.qs = np.linspace(0.01, 1, 500) - m1s_grid, qs_grid = xp.meshgrid(self.m1s, self.qs) - self.dataset = dict(mass_1=m1s_grid, mass_ratio=qs_grid) - self.power_prior = PriorDict() - self.power_prior["alpha"] = Uniform(minimum=-4, maximum=12) - self.power_prior["beta"] = Uniform(minimum=-4, maximum=12) - self.power_prior["mmin"] = Uniform(minimum=3, maximum=10) - self.power_prior["mmax"] = Uniform(minimum=40, maximum=100) - self.gauss_prior = PriorDict() - self.gauss_prior["lam"] = Uniform(minimum=0, maximum=1) - self.gauss_prior["mpp"] = Uniform(minimum=20, maximum=60) - self.gauss_prior["sigpp"] = Uniform(minimum=0, maximum=10) - self.n_test = 10 - - def test_power_law_primary_mass_ratio_zero_below_mmin(self): - m2s = self.dataset["mass_1"] * self.dataset["mass_ratio"] - for ii in range(self.n_test): - parameters = self.power_prior.sample() - p_m = mass.power_law_primary_mass_ratio(self.dataset, **parameters) - self.assertEqual(xp.max(p_m[m2s <= parameters["mmin"]]), 0.0) - - def test_power_law_primary_mass_ratio_zero_above_mmax(self): - for ii in range(self.n_test): - parameters = self.power_prior.sample() - p_m = mass.power_law_primary_mass_ratio(self.dataset, **parameters) - self.assertEqual( - xp.max(p_m[self.dataset["mass_1"] >= parameters["mmax"]]), 0.0 - ) - - def test_two_component_primary_mass_ratio_zero_below_mmin(self): - m2s = self.dataset["mass_1"] * self.dataset["mass_ratio"] - for ii in range(self.n_test): - parameters = self.power_prior.sample() - parameters.update(self.gauss_prior.sample()) - p_m = mass.two_component_primary_mass_ratio(self.dataset, **parameters) - self.assertEqual(xp.max(p_m[m2s <= parameters["mmin"]]), 0.0) - - -class TestPrimarySecondary(unittest.TestCase): - def setUp(self): - self.ms = np.linspace(3, 100, 1000) - self.dm = self.ms[1] - self.ms[0] - m1s_grid, m2s_grid = xp.meshgrid(self.ms, self.ms) - self.dataset = dict(mass_1=m1s_grid, mass_2=m2s_grid) - self.power_prior = PriorDict() - self.power_prior["alpha"] = Uniform(minimum=-4, maximum=12) - self.power_prior["beta"] = Uniform(minimum=-4, maximum=12) - self.power_prior["mmin"] = Uniform(minimum=3, maximum=10) - self.power_prior["mmax"] = Uniform(minimum=40, maximum=100) - self.gauss_prior = PriorDict() - self.gauss_prior["lam"] = Uniform(minimum=0, maximum=1) - self.gauss_prior["mpp"] = Uniform(minimum=20, maximum=60) - self.gauss_prior["sigpp"] = Uniform(minimum=0, maximum=10) - self.n_test = 10 - - def test_power_law_primary_secondary_zero_below_mmin(self): - for ii in range(self.n_test): - parameters = self.power_prior.sample() - p_m = mass.power_law_primary_secondary_independent( - self.dataset, **parameters - ) - self.assertEqual( - xp.max(p_m[self.dataset["mass_2"] <= parameters["mmin"]]), 0.0 - ) - - def test_power_law_primary_secondary_zero_above_mmax(self): - for ii in range(self.n_test): - parameters = self.power_prior.sample() - del parameters["beta"] - p_m = mass.power_law_primary_secondary_identical(self.dataset, **parameters) - self.assertEqual( - xp.max(p_m[self.dataset["mass_1"] >= parameters["mmax"]]), 0.0 - ) - - def test_two_component_primary_secondary_zero_below_mmin(self): - for ii in range(self.n_test): - parameters = self.power_prior.sample() - parameters.update(self.gauss_prior.sample()) - del parameters["beta"] - p_m = mass.two_component_primary_secondary_identical( - self.dataset, **parameters - ) - self.assertEqual( - xp.max(p_m[self.dataset["mass_2"] <= parameters["mmin"]]), 0.0 - ) - - -class TestSmoothedMassDistribution(unittest.TestCase): - def setUp(self): - self.m1s = np.linspace(2, 100, 1000) - self.qs = np.linspace(0.01, 1, 500) - self.dm = self.m1s[1] - self.m1s[0] - self.dq = self.qs[1] - self.qs[0] - m1s_grid, qs_grid = xp.meshgrid(self.m1s, self.qs) - self.dataset = dict(mass_1=m1s_grid, mass_ratio=qs_grid) - self.power_prior = PriorDict() - self.power_prior["alpha"] = Uniform(minimum=-4, maximum=12) - self.power_prior["beta"] = Uniform(minimum=-4, maximum=12) - self.power_prior["mmin"] = Uniform(minimum=3, maximum=10) - self.power_prior["mmax"] = Uniform(minimum=30, maximum=100) - self.gauss_prior = PriorDict() - self.gauss_prior["lam"] = Uniform(minimum=0, maximum=1) - self.gauss_prior["mpp"] = Uniform(minimum=20, maximum=60) - self.gauss_prior["sigpp"] = Uniform(minimum=0, maximum=10) - self.double_gauss_prior = PriorDict() - self.double_gauss_prior["lam"] = Uniform(minimum=0, maximum=1) - self.double_gauss_prior["lam_1"] = Uniform(minimum=0, maximum=1) - self.double_gauss_prior["mpp_1"] = Uniform(minimum=20, maximum=60) - self.double_gauss_prior["mpp_2"] = Uniform(minimum=20, maximum=100) - self.double_gauss_prior["sigpp_1"] = Uniform(minimum=0, maximum=10) - self.double_gauss_prior["sigpp_2"] = Uniform(minimum=0, maximum=10) - self.broken_power_prior = PriorDict() - self.broken_power_prior["alpha_1"] = Uniform(minimum=-4, maximum=12) - self.broken_power_prior["alpha_2"] = Uniform(minimum=-4, maximum=12) - self.broken_power_prior["break_fraction"] = Uniform(minimum=0, maximum=1) - self.broken_power_prior["beta"] = Uniform(minimum=-4, maximum=12) - self.broken_power_prior["mmin"] = Uniform(minimum=3, maximum=10) - self.broken_power_prior["mmax"] = Uniform(minimum=30, maximum=100) - self.broken_power_peak_prior = PriorDict() - self.broken_power_peak_prior["alpha_1"] = Uniform(minimum=-4, maximum=12) - self.broken_power_peak_prior["alpha_2"] = Uniform(minimum=-4, maximum=12) - self.broken_power_peak_prior["break_fraction"] = Uniform(minimum=0, maximum=1) - self.broken_power_peak_prior["beta"] = Uniform(minimum=-4, maximum=12) - self.broken_power_peak_prior["mmin"] = Uniform(minimum=3, maximum=10) - self.broken_power_peak_prior["mmax"] = Uniform(minimum=30, maximum=100) - self.broken_power_peak_prior["lam"] = Uniform(minimum=0, maximum=1) - self.broken_power_peak_prior["mpp"] = Uniform(minimum=20, maximum=60) - self.broken_power_peak_prior["sigpp"] = Uniform(minimum=0, maximum=10) - self.smooth_prior = PriorDict() - self.smooth_prior["delta_m"] = Uniform(minimum=0, maximum=10) - self.n_test = 10 - - def test_single_peak_delta_m_zero_matches_two_component_primary_mass_ratio(self): - max_diffs = list() - for ii in range(self.n_test): - parameters = self.power_prior.sample() - parameters.update(self.gauss_prior.sample()) - p_m1 = mass.two_component_primary_mass_ratio(self.dataset, **parameters) - parameters["delta_m"] = 0 - p_m2 = mass.SinglePeakSmoothedMassDistribution()(self.dataset, **parameters) - max_diffs.append(_max_abs_difference(p_m1, p_m2)) - self.assertAlmostEqual(max(max_diffs), 0.0) - - def test_double_peak_delta_m_zero_matches_two_component_primary_mass_ratio(self): - max_diffs = list() - for ii in range(self.n_test): - parameters = self.power_prior.sample() - parameters.update(self.double_gauss_prior.sample()) - del parameters["beta"] - p_m1 = mass.three_component_single( - mass=self.dataset["mass_1"], **parameters - ) - parameters["delta_m"] = 0 - p_m2 = mass.MultiPeakSmoothedMassDistribution().p_m1( - self.dataset, **parameters - ) - max_diffs.append(_max_abs_difference(p_m1, p_m2)) - self.assertAlmostEqual(max(max_diffs), 0.0) - - def test_single_peak_normalised(self): - norms = list() - for ii in range(self.n_test): - parameters = self.power_prior.sample() - parameters.update(self.gauss_prior.sample()) - parameters.update(self.smooth_prior.sample()) - p_m = mass.SinglePeakSmoothedMassDistribution()(self.dataset, **parameters) - norms.append(trapz(trapz(p_m, self.m1s), self.qs)) - self.assertAlmostEqual(_max_abs_difference(norms, 1.0), 0.0, 2) - - def test_double_peak_normalised(self): - norms = list() - for ii in range(self.n_test): - parameters = self.power_prior.sample() - parameters.update(self.double_gauss_prior.sample()) - parameters.update(self.smooth_prior.sample()) - p_m = mass.MultiPeakSmoothedMassDistribution()(self.dataset, **parameters) - norms.append(trapz(trapz(p_m, self.m1s), self.qs)) - self.assertAlmostEqual(_max_abs_difference(norms, 1.0), 0.0, 2) - - def test_broken_power_law_normalised(self): - norms = list() - for ii in range(self.n_test): - parameters = self.broken_power_prior.sample() - parameters.update(self.smooth_prior.sample()) - p_m = mass.BrokenPowerLawSmoothedMassDistribution()( - self.dataset, **parameters - ) - norms.append(trapz(trapz(p_m, self.m1s), self.qs)) - self.assertAlmostEqual(_max_abs_difference(norms, 1.0), 0.0, 2) - - def test_broken_power_law_peak_normalised(self): - norms = list() - for ii in range(self.n_test): - parameters = self.broken_power_peak_prior.sample() - parameters.update(self.smooth_prior.sample()) - p_m = mass.BrokenPowerLawPeakSmoothedMassDistribution()( - self.dataset, **parameters - ) - norms.append(trapz(trapz(p_m, self.m1s), self.qs)) - self.assertAlmostEqual(_max_abs_difference(norms, 1.0), 0.0, 2) - - def test_set_minimum_and_maximum(self): - model = mass.SinglePeakSmoothedMassDistribution(mmin=5, mmax=150) - parameters = self.gauss_prior.sample() - parameters.update(self.power_prior.sample()) - parameters.update(self.smooth_prior.sample()) - parameters["mpp"] = 130 - parameters["sigpp"] = 1 - parameters["lam"] = 0.5 - parameters["mmin"] = 5 - self.assertEqual( - model( - dict(mass_1=8 * np.ones(5), mass_ratio=0.5 * np.ones(5)), **parameters - )[0], - 0, - ) - self.assertGreater( - model( - dict(mass_1=130 * np.ones(5), mass_ratio=0.9 * np.ones(5)), **parameters - )[0], - 0, - ) - - def test_mmin_below_global_minimum_raises_error(self): - model = mass.SinglePeakSmoothedMassDistribution(mmin=5, mmax=150) - parameters = self.gauss_prior.sample() - parameters.update(self.power_prior.sample()) - parameters.update(self.smooth_prior.sample()) - parameters["mmin"] = 2 - with self.assertRaises(ValueError): - model(self.dataset, **parameters) - - def test_mmax_above_global_maximum_raises_error(self): - model = mass.SinglePeakSmoothedMassDistribution(mmin=5, mmax=150) - parameters = self.gauss_prior.sample() - parameters.update(self.power_prior.sample()) - parameters.update(self.smooth_prior.sample()) - parameters["mmax"] = 200 - with self.assertRaises(ValueError): - model(self.dataset, **parameters) - - -def _max_abs_difference(array, comparison): - return float(xp.max(xp.abs(comparison - xp.asarray(array)))) +xp = np +N_TEST = 10 + + +def double_power_prior(): + power_prior = PriorDict() + power_prior["alpha_1"] = Uniform(minimum=-4, maximum=12) + power_prior["alpha_2"] = Uniform(minimum=-4, maximum=12) + power_prior["beta"] = Uniform(minimum=-4, maximum=12) + power_prior["mmin"] = Uniform(minimum=3, maximum=10) + power_prior["mmax"] = Uniform(minimum=40, maximum=100) + power_prior["break_fraction"] = Uniform(minimum=0, maximum=1) + return power_prior + + +def power_prior(): + power_prior = PriorDict() + power_prior["alpha"] = Uniform(minimum=-4, maximum=12) + power_prior["beta"] = Uniform(minimum=-4, maximum=12) + power_prior["mmin"] = Uniform(minimum=3, maximum=10) + power_prior["mmax"] = Uniform(minimum=40, maximum=100) + return power_prior + + +def gauss_prior(): + gauss_prior = PriorDict() + gauss_prior["lam"] = Uniform(minimum=0, maximum=1) + gauss_prior["mpp"] = Uniform(minimum=20, maximum=60) + gauss_prior["sigpp"] = Uniform(minimum=0, maximum=10) + return gauss_prior + + +def double_gauss_prior(): + double_gauss_prior = PriorDict() + double_gauss_prior["lam"] = Uniform(minimum=0, maximum=1) + double_gauss_prior["lam_1"] = Uniform(minimum=0, maximum=1) + double_gauss_prior["mpp_1"] = Uniform(minimum=20, maximum=60) + double_gauss_prior["mpp_2"] = Uniform(minimum=20, maximum=100) + double_gauss_prior["sigpp_1"] = Uniform(minimum=0, maximum=10) + double_gauss_prior["sigpp_2"] = Uniform(minimum=0, maximum=10) + return double_gauss_prior + + +def smooth_prior(): + smooth_prior = PriorDict() + smooth_prior["delta_m"] = Uniform(minimum=0, maximum=10) + return smooth_prior + + +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_double_power_law_zero_below_mmin(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + to_numpy = gwpopulation.utils.to_numpy + m1s, _, _ = get_primary_mass_ratio_data(xp) + prior = double_power_prior() + for ii in range(N_TEST): + parameters = prior.sample() + del parameters["beta"] + p_m = mass.double_power_law_primary_mass(m1s, **parameters) + p_m = to_numpy(p_m) + assert np.max(p_m[to_numpy(m1s) < parameters["mmin"]]) == 0.0 + + +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_power_law_primary_mass_ratio_zero_above_mmax(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + to_numpy = gwpopulation.utils.to_numpy + m1s, qs, dataset = get_primary_mass_ratio_data(xp) + prior = double_power_prior() + m1s = dataset["mass_1"] + for ii in range(N_TEST): + parameters = prior.sample() + p_m = mass.double_power_law_primary_power_law_mass_ratio(dataset, **parameters) + p_m = to_numpy(p_m) + assert np.max(p_m[m1s > parameters["mmax"]]) == 0.0 + + +def get_primary_mass_ratio_data(xp): + m1s = xp.linspace(3, 100, 1000) + qs = xp.linspace(0.01, 1, 500) + m1s_grid, qs_grid = xp.meshgrid(m1s, qs) + dataset = dict(mass_1=m1s_grid, mass_ratio=qs_grid) + return m1s, qs, dataset + + +def get_primary_secondary_data(xp): + ms = xp.linspace(3, 100, 1000) + dm = ms[1] - ms[0] + m1s_grid, m2s_grid = xp.meshgrid(ms, ms) + dataset = dict(mass_1=m1s_grid, mass_2=m2s_grid) + return to_numpy(ms), float(dm), dataset + + +def get_smoothed_data(xp): + m1s = xp.linspace(2, 100, 1000) + qs = xp.linspace(0.01, 1, 500) + m1s_grid, qs_grid = xp.meshgrid(m1s, qs) + dataset = dict(mass_1=m1s_grid, mass_ratio=qs_grid) + return m1s, qs, dataset + + +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_power_law_primary_mass_ratio_zero_below_mmin(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + to_numpy = gwpopulation.utils.to_numpy + _, _, dataset = get_primary_mass_ratio_data(xp) + prior = power_prior() + m2s = dataset["mass_1"] * dataset["mass_ratio"] + for _ in range(N_TEST): + parameters = prior.sample() + p_m = mass.power_law_primary_mass_ratio(dataset, **parameters) + p_m = to_numpy(p_m) + assert np.max(p_m[to_numpy(m2s) < parameters["mmin"]]) == 0.0 + + +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_power_law_primary_mass_ratio_zero_above_mmax(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + to_numpy = gwpopulation.utils.to_numpy + _, _, dataset = get_primary_mass_ratio_data(xp) + prior = power_prior() + m1s = dataset["mass_1"] + for _ in range(N_TEST): + parameters = prior.sample() + p_m = mass.power_law_primary_mass_ratio(dataset, **parameters) + p_m = to_numpy(p_m) + assert np.max(p_m[to_numpy(m1s) > parameters["mmax"]]) == 0.0 + + +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_two_component_primary_mass_ratio_zero_below_mmin(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + to_numpy = gwpopulation.utils.to_numpy + _, _, dataset = get_primary_mass_ratio_data(xp) + prior = power_prior() + prior.update(gauss_prior()) + m2s = dataset["mass_1"] * dataset["mass_ratio"] + for _ in range(N_TEST): + parameters = prior.sample() + p_m = mass.two_component_primary_mass_ratio(dataset, **parameters) + p_m = to_numpy(p_m) + assert np.max(p_m[to_numpy(m2s) <= parameters["mmin"]]) == 0.0 -if __name__ == "__main__": - unittest.main() +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_power_law_primary_secondary_zero_below_mmin(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + to_numpy = gwpopulation.utils.to_numpy + _, _, dataset = get_primary_secondary_data(xp) + prior = power_prior() + m2s = dataset["mass_2"] + for _ in range(N_TEST): + parameters = prior.sample() + p_m = mass.power_law_primary_secondary_independent(dataset, **parameters) + p_m = to_numpy(p_m) + assert np.max(p_m[to_numpy(m2s) <= parameters["mmin"]]) == 0.0 + + +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_power_law_primary_secondary_zero_above_mmax(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + to_numpy = gwpopulation.utils.to_numpy + _, _, dataset = get_primary_secondary_data(xp) + prior = power_prior() + del prior["beta"] + m1s = dataset["mass_1"] + for _ in range(N_TEST): + parameters = prior.sample() + p_m = mass.power_law_primary_secondary_identical(dataset, **parameters) + p_m = to_numpy(p_m) + assert np.max(p_m[to_numpy(m1s) >= parameters["mmax"]]) == 0.0 + + +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_two_component_primary_secondary_zero_below_mmin(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + to_numpy = gwpopulation.utils.to_numpy + _, _, dataset = get_primary_secondary_data(xp) + prior = power_prior() + prior.update(gauss_prior()) + del prior["beta"] + m2s = dataset["mass_2"] + for _ in range(N_TEST): + parameters = prior.sample() + p_m = mass.two_component_primary_secondary_identical(dataset, **parameters) + assert np.max(p_m[to_numpy(m2s) <= parameters["mmin"]]) == 0.0 + + +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_single_peak_delta_m_zero_matches_two_component_primary_mass_ratio(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + _, _, dataset = get_smoothed_data(xp) + max_diffs = list() + prior = power_prior() + prior.update(gauss_prior()) + for _ in range(N_TEST): + parameters = prior.sample() + p_m1 = mass.two_component_primary_mass_ratio(dataset, **parameters) + parameters["delta_m"] = 0 + p_m2 = mass.SinglePeakSmoothedMassDistribution()(dataset, **parameters) + max_diffs.append(_max_abs_difference(p_m1, p_m2, xp=xp)) + assert max(max_diffs) < 1e-5 + + +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_double_peak_delta_m_zero_matches_two_component_primary_mass_ratio(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + _, _, dataset = get_smoothed_data(xp) + max_diffs = list() + prior = power_prior() + prior.update(double_gauss_prior()) + for _ in range(N_TEST): + parameters = prior.sample() + del parameters["beta"] + p_m1 = mass.three_component_single(mass=dataset["mass_1"], **parameters) + parameters["delta_m"] = 0 + p_m2 = mass.MultiPeakSmoothedMassDistribution().p_m1(dataset, **parameters) + max_diffs.append(_max_abs_difference(p_m1, p_m2, xp=xp)) + assert max(max_diffs) < 1e-5 + + +def _normalised(model, prior, xp): + m1s, qs, dataset = get_smoothed_data(xp) + norms = list() + for _ in range(N_TEST): + parameters = prior.sample() + p_m = model(dataset, **parameters) + norms.append(float(xp.trapz(xp.trapz(p_m, m1s), qs))) + assert _max_abs_difference(norms, 1.0, xp=xp) < 0.01 + + +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_single_peak_normalised(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + model = mass.SinglePeakSmoothedMassDistribution() + prior = power_prior() + prior.update(gauss_prior()) + prior.update(smooth_prior()) + _normalised(model, prior, xp) + + +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_double_peak_normalised(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + model = mass.MultiPeakSmoothedMassDistribution() + prior = power_prior() + prior.update(double_gauss_prior()) + prior.update(smooth_prior()) + _normalised(model, prior, xp) + + +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_broken_power_law_normalised(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + model = mass.BrokenPowerLawSmoothedMassDistribution() + prior = double_power_prior() + prior.update(smooth_prior()) + _normalised(model, prior, xp) + + +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_broken_power_law_peak_normalised(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + model = mass.BrokenPowerLawPeakSmoothedMassDistribution() + prior = double_power_prior() + prior.update(smooth_prior()) + prior.update(gauss_prior()) + _normalised(model, prior, xp) + + +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_set_minimum_and_maximum(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + model = mass.SinglePeakSmoothedMassDistribution(mmin=5, mmax=150) + parameters = gauss_prior().sample() + parameters.update(power_prior().sample()) + parameters.update(smooth_prior().sample()) + parameters["mpp"] = 130 + parameters["sigpp"] = 1 + parameters["lam"] = 0.5 + parameters["mmin"] = 6 + assert ( + model(dict(mass_1=8 * xp.ones(6), mass_ratio=0.5 * xp.ones(6)), **parameters)[0] + == 0 + ) + assert ( + model(dict(mass_1=130 * xp.ones(5), mass_ratio=0.9 * xp.ones(5)), **parameters)[ + 0 + ] + > 0 + ) + + +def test_mmin_below_global_minimum_raises_error(): + model = mass.SinglePeakSmoothedMassDistribution(mmin=5, mmax=150) + parameters = gauss_prior().sample() + parameters.update(power_prior().sample()) + parameters.update(smooth_prior().sample()) + parameters["mmin"] = 2 + with pytest.raises(ValueError): + model(dict(mass_1=5, mass_ratio=0.9), **parameters) + + +def test_mmax_above_global_maximum_raises_error(): + model = mass.SinglePeakSmoothedMassDistribution(mmin=5, mmax=150) + parameters = gauss_prior().sample() + parameters.update(power_prior().sample()) + parameters.update(smooth_prior().sample()) + parameters["mmax"] = 200 + with pytest.raises(ValueError): + model(dict(mass_1=5, mass_ratio=0.9), **parameters) + + +def _max_abs_difference(array, comparison, xp=np): + return float(xp.max(xp.abs(comparison - xp.asarray(array)))) diff --git a/test/redshift_test.py b/test/redshift_test.py index 1f65bccd..8d7aec1d 100644 --- a/test/redshift_test.py +++ b/test/redshift_test.py @@ -1,71 +1,75 @@ -import unittest - import numpy as np +import pytest from astropy.cosmology import Planck15 from bilby.core.prior import PriorDict, Uniform -from gwpopulation.cupy_utils import trapz, xp +import gwpopulation from gwpopulation.models import redshift +from . import TEST_BACKENDS + +N_TEST = 100 + + +def _run_model_normalisation(model, priors, xp=np): + zs = xp.linspace(1e-3, 2.3, 1000) + test_data = dict(redshift=zs) + norms = list() + for _ in range(N_TEST): + p_z = model(test_data, **priors.sample()) + norms.append(float(xp.trapz(p_z, zs))) + assert np.max(np.abs(np.array(norms) - 1)) < 1e-3 + -class TestRedshift(unittest.TestCase): - def setUp(self): - self.zs = xp.linspace(1e-3, 2.3, 1000) - self.test_data = dict(redshift=self.zs) - self.n_test = 100 +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_powerlaw_normalised(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + model = redshift.PowerLawRedshift() + priors = PriorDict() + priors["lamb"] = Uniform(-15, 15) + _run_model_normalisation(model=model, priors=priors, xp=xp) - def _run_model_normalisation(self, model, priors): - norms = list() - for _ in range(self.n_test): - p_z = model(self.test_data, **priors.sample()) - norms.append(trapz(p_z, self.zs)) - self.assertAlmostEqual(xp.max(xp.abs(xp.asarray(norms) - 1)), 0.0) - def test_powerlaw_normalised(self): - model = redshift.PowerLawRedshift() - priors = PriorDict() - priors["lamb"] = Uniform(-15, 15) - self._run_model_normalisation(model=model, priors=priors) +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_madau_dickinson_normalised(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + model = redshift.MadauDickinsonRedshift() + priors = PriorDict() + priors["gamma"] = Uniform(-15, 15) + priors["kappa"] = Uniform(-15, 15) + priors["z_peak"] = Uniform(0, 5) + _run_model_normalisation(model=model, priors=priors, xp=xp) - def test_madau_dickinson_normalised(self): - model = redshift.MadauDickinsonRedshift() - priors = PriorDict() - priors["gamma"] = Uniform(-15, 15) - priors["kappa"] = Uniform(-15, 15) - priors["z_peak"] = Uniform(0, 5) - self._run_model_normalisation(model=model, priors=priors) - def test_powerlaw_volume(self): - """ - Test that the total volume matches the expected value for a - trivial case - """ - model = redshift.PowerLawRedshift() - parameters = dict(lamb=1) - total_volume = np.trapz( - Planck15.differential_comoving_volume(self.zs).value * 4 * np.pi, - self.zs, - ) - self.assertAlmostEqual( - total_volume, - model.normalisation(parameters), - places=3, - ) +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_powerlaw_volume(backend): + """ + Test that the total volume matches the expected value for a + trivial case + """ + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + zs = xp.linspace(1e-3, 2.3, 1000) + zs_numpy = gwpopulation.utils.to_numpy(zs) + model = redshift.PowerLawRedshift() + parameters = dict(lamb=1) + total_volume = np.trapz( + Planck15.differential_comoving_volume(zs_numpy).value * 4 * np.pi, + zs_numpy, + ) + assert abs(total_volume - float(model.normalisation(parameters))) < 1e-3 - def test_zero_outside_domain(self): - model = redshift.PowerLawRedshift(z_max=1) - parameters = dict(lamb=1) - p_z = model(self.test_data, **parameters) - self.assertTrue(all(p_z[self.zs > 1] == 0)) +def test_zero_outside_domain(): + model = redshift.PowerLawRedshift(z_max=1) + assert model(dict(redshift=5), lamb=1) == 0 -class TestFourVolume(unittest.TestCase): - def setUp(self) -> None: - pass - def test_four_volume(self): - self.assertAlmostEqual( - Planck15.comoving_volume(2.3).value / 1e9, - redshift.total_four_volume(lamb=1, analysis_time=1, max_redshift=2.3), - 4, - ) +def test_four_volume(): + assert ( + Planck15.comoving_volume(2.3).value / 1e9 + - redshift.total_four_volume(lamb=1, analysis_time=1, max_redshift=2.3) + < 1e-3 + ) diff --git a/test/spin_test.py b/test/spin_test.py index 2556b21c..a9d4fd64 100644 --- a/test/spin_test.py +++ b/test/spin_test.py @@ -1,193 +1,226 @@ -import unittest - +import numpy as np +import pytest from bilby.core.prior import PriorDict, Uniform -from gwpopulation.cupy_utils import trapz, xp +import gwpopulation from gwpopulation.models import spin from gwpopulation.utils import truncnorm - -class TestSpinOrientation(unittest.TestCase): - def setUp(self): - self.costilts = xp.linspace(-1, 1, 1000) - self.test_data = dict( - cos_tilt_1=xp.einsum("i,j->ij", self.costilts, xp.ones_like(self.costilts)), - cos_tilt_2=xp.einsum("i,j->ji", self.costilts, xp.ones_like(self.costilts)), - ) - self.prior = PriorDict(dict(xi_spin=Uniform(0, 1), sigma_spin=Uniform(0, 4))) - self.n_test = 100 - - def tearDown(self): - del self.test_data - del self.prior - del self.costilts - del self.n_test - - def test_spin_orientation_normalised(self): - norms = list() - for ii in range(self.n_test): - parameters = self.prior.sample() - temp = spin.iid_spin_orientation_gaussian_isotropic( - self.test_data, **parameters +from . import TEST_BACKENDS + +xp = np +N_TEST = 100 + + +def tilt_prior(): + return PriorDict(dict(xi_spin=Uniform(0, 1), sigma_spin=Uniform(0, 4))) + + +def magnitude_prior(): + return PriorDict( + dict(amax=Uniform(0.3, 1), alpha_chi=Uniform(1, 4), beta_chi=Uniform(1, 4)) + ) + + +def tilt_test_data(xp): + costilts = xp.linspace(-1, 1, 1000) + dataset = dict( + cos_tilt_1=xp.einsum("i,j->ij", costilts, xp.ones_like(costilts)), + cos_tilt_2=xp.einsum("i,j->ji", costilts, xp.ones_like(costilts)), + ) + return costilts, dataset + + +def magnitude_test_data(xp): + a_array = xp.linspace(0, 1, 1000) + dataset = dict( + a_1=xp.einsum("i,j->ij", a_array, xp.ones_like(a_array)), + a_2=xp.einsum("i,j->ji", a_array, xp.ones_like(a_array)), + ) + return a_array, dataset + + +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_spin_orientation_normalised(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + norms = list() + prior = tilt_prior() + costilts, dataset = tilt_test_data(xp) + for ii in range(N_TEST): + parameters = prior.sample() + temp = spin.iid_spin_orientation_gaussian_isotropic(dataset, **parameters) + norms.append(float(xp.trapz(xp.trapz(temp, costilts), costilts))) + assert float(np.max(np.abs(1 - np.asarray(norms)))) < 1e-5 + + +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_iid_matches_independent_tilts(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + iid_params = dict(xi_spin=0.5, sigma_spin=0.5) + ind_params = dict(xi_spin=0.5, sigma_1=0.5, sigma_2=0.5) + _, dataset = tilt_test_data(xp) + assert ( + xp.max( + spin.iid_spin_orientation_gaussian_isotropic(dataset, **iid_params) + - spin.independent_spin_orientation_gaussian_isotropic( + dataset, **ind_params ) - norms.append(trapz(trapz(temp, self.costilts), self.costilts)) - self.assertAlmostEqual(float(xp.max(xp.abs(1 - xp.asarray(norms)))), 0, 5) - - def test_iid_matches_independent_tilts(self): - iid_params = dict(xi_spin=0.5, sigma_spin=0.5) - ind_params = dict(xi_spin=0.5, sigma_1=0.5, sigma_2=0.5) - self.assertEqual( - 0.0, - xp.max( - spin.iid_spin_orientation_gaussian_isotropic( - self.test_data, **iid_params - ) - - spin.independent_spin_orientation_gaussian_isotropic( - self.test_data, **ind_params - ) - ), - ) - - -class TestSpinMagnitude(unittest.TestCase): - def setUp(self): - self.a_array = xp.linspace(0, 1, 1000) - self.test_data = dict( - a_1=xp.einsum("i,j->ij", self.a_array, xp.ones_like(self.a_array)), - a_2=xp.einsum("i,j->ji", self.a_array, xp.ones_like(self.a_array)), ) - self.prior = PriorDict( - dict(amax=Uniform(0.3, 1), alpha_chi=Uniform(1, 4), beta_chi=Uniform(1, 4)) - ) - self.n_test = 100 - - def tearDown(self): - del self.test_data - del self.prior - del self.a_array - del self.n_test - - def test_spin_magnitude_normalised(self): - norms = list() - for ii in range(self.n_test): - parameters = self.prior.sample() - temp = spin.iid_spin_magnitude_beta(self.test_data, **parameters) - norms.append(trapz(trapz(temp, self.a_array), self.a_array)) - self.assertAlmostEqual(float(xp.max(xp.abs(1 - xp.asarray(norms)))), 0, 1) - - def test_returns_zero_alpha_beta_less_zero(self): - parameters = self.prior.sample() - for key in ["alpha_chi", "beta_chi"]: - parameters[key] = -1 - self.assertEqual( - spin.iid_spin_magnitude_beta(self.test_data, **parameters), 0 - ) - - def test_iid_matches_independent_magnitudes(self): - iid_params = self.prior.sample() - ind_params = dict() - ind_params.update({key + "_1": iid_params[key] for key in iid_params}) - ind_params.update({key + "_2": iid_params[key] for key in iid_params}) - self.assertEqual( - 0.0, + == 0 + ) + + +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_spin_magnitude_normalised(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + prior = magnitude_prior() + a_array, dataset = magnitude_test_data(xp) + norms = list() + for ii in range(N_TEST): + parameters = prior.sample() + temp = spin.iid_spin_magnitude_beta(dataset, **parameters) + norms.append(xp.trapz(xp.trapz(temp, a_array), a_array)) + assert float(xp.max(xp.abs(1 - xp.asarray(norms)))) < 1e-2 + + +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_returns_zero_alpha_beta_less_zero(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + prior = magnitude_prior() + a_array, dataset = magnitude_test_data(xp) + parameters = prior.sample() + for key in ["alpha_chi", "beta_chi"]: + parameters[key] = -1 + assert np.all(spin.iid_spin_magnitude_beta(dataset, **parameters) == 0) + + +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_iid_matches_independent_magnitudes(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + prior = magnitude_prior() + a_array, dataset = magnitude_test_data(xp) + iid_params = prior.sample() + ind_params = dict() + ind_params.update({key + "_1": iid_params[key] for key in iid_params}) + ind_params.update({key + "_2": iid_params[key] for key in iid_params}) + assert ( + float( xp.max( - spin.iid_spin_magnitude_beta(self.test_data, **iid_params) - - spin.independent_spin_magnitude_beta(self.test_data, **ind_params) - ), - ) - - -class TestIIDSpin(unittest.TestCase): - def setUp(self): - self.a_array = xp.linspace(0, 1, 1000) - self.costilts = xp.linspace(-1, 1, 1000) - self.test_data = dict( - a_1=xp.einsum("i,j->ij", self.a_array, xp.ones_like(self.a_array)), - a_2=xp.einsum("i,j->ji", self.a_array, xp.ones_like(self.a_array)), - cos_tilt_1=xp.einsum("i,j->ij", self.costilts, xp.ones_like(self.costilts)), - cos_tilt_2=xp.einsum("i,j->ji", self.costilts, xp.ones_like(self.costilts)), - ) - self.prior = PriorDict( - dict( - amax=Uniform(0.3, 1), - alpha_chi=Uniform(1, 4), - beta_chi=Uniform(1, 4), - xi_spin=Uniform(0, 1), - sigma_spin=Uniform(0, 4), + spin.iid_spin_magnitude_beta(dataset, **iid_params) + - spin.independent_spin_magnitude_beta(dataset, **ind_params) ) ) - self.n_test = 100 - - def test_iid_matches_independent(self): - params = self.prior.sample() - mag_params = {key: params[key] for key in ["amax", "alpha_chi", "beta_chi"]} - tilt_params = {key: params[key] for key in ["xi_spin", "sigma_spin"]} - self.assertEqual( - 0.0, + < 1e-5 + ) + + +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_iid_matches_independent(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + test_data = dict() + costilts, new_data = tilt_test_data(xp) + test_data.update(new_data) + a_array, new_data = magnitude_test_data(xp) + test_data.update(new_data) + prior = magnitude_prior() + prior.update(tilt_prior()) + params = prior.sample() + mag_params = {key: params[key] for key in ["amax", "alpha_chi", "beta_chi"]} + tilt_params = {key: params[key] for key in ["xi_spin", "sigma_spin"]} + assert ( + float( xp.max( - spin.iid_spin(self.test_data, **params) - - spin.iid_spin_magnitude_beta(self.test_data, **mag_params) - * spin.iid_spin_orientation_gaussian_isotropic( - self.test_data, **tilt_params - ) - ), + spin.iid_spin(test_data, **params) + - spin.iid_spin_magnitude_beta(test_data, **mag_params) + * spin.iid_spin_orientation_gaussian_isotropic(test_data, **tilt_params) + ) ) + < 1e-5 + ) -class TestGaussianSpin(unittest.TestCase): - def setUp(self) -> None: - pass - - def test_gaussian_chi_eff(self): - self.assertTrue( - xp.all( +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_gaussian_chi_eff(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + assert ( + float( + xp.max( spin.gaussian_chi_eff( dict(chi_eff=xp.linspace(-2, 2, 1001)), mu_chi_eff=0.4, sigma_chi_eff=0.1, ) - == truncnorm( - xp.linspace(-2, 2, 1001), mu=0.4, sigma=0.1, low=-1, high=1 - ) + - truncnorm(xp.linspace(-2, 2, 1001), mu=0.4, sigma=0.1, low=-1, high=1) ) ) + < 1e-5 + ) - def test_gaussian_chi_p(self): - self.assertTrue( - xp.all( + +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_gaussian_chi_p(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + assert ( + float( + xp.max( spin.gaussian_chi_p( dict(chi_p=xp.linspace(-2, 2, 1001)), mu_chi_p=0.4, sigma_chi_p=0.1 ) - == truncnorm(xp.linspace(-2, 2, 1001), mu=0.4, sigma=0.1, low=0, high=1) + - truncnorm(xp.linspace(-2, 2, 1001), mu=0.4, sigma=0.1, low=0, high=1) ) ) - - def test_2d_gaussian_normalized(self): - model = spin.GaussianChiEffChiP() - chi_eff, chi_p = xp.meshgrid(xp.linspace(-1, 1, 501), xp.linspace(0, 1, 300)) - parameters = dict( - mu_chi_eff=0.1, - mu_chi_p=0.3, - sigma_chi_eff=0.6, - sigma_chi_p=0.5, - spin_covariance=0.9, - ) - prob = model(dict(chi_eff=chi_eff, chi_p=chi_p), **parameters) - self.assertAlmostEqual( - xp.trapz(xp.trapz(prob, xp.linspace(-1, 1, 501)), xp.linspace(0, 1, 300)), - 1, - 5, + == 0 + ) + + +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_2d_gaussian_normalized(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + model = spin.GaussianChiEffChiP() + chi_eff, chi_p = xp.meshgrid(xp.linspace(-1, 1, 501), xp.linspace(0, 1, 300)) + parameters = dict( + mu_chi_eff=0.1, + mu_chi_p=0.3, + sigma_chi_eff=0.6, + sigma_chi_p=0.5, + spin_covariance=0.9, + ) + prob = model(dict(chi_eff=chi_eff, chi_p=chi_p), **parameters) + assert ( + xp.max( + xp.abs( + xp.trapz( + xp.trapz(prob, xp.linspace(-1, 1, 501)), xp.linspace(0, 1, 300) + ) + - 1 + ) ) - - def test_2d_gaussian_no_covariance_matches_independent(self): - model = spin.GaussianChiEffChiP() - chi_eff, chi_p = xp.meshgrid(xp.linspace(-1, 1, 501), xp.linspace(0, 1, 300)) - data = dict(chi_eff=chi_eff, chi_p=chi_p) - self.assertTrue( - xp.all( + < 1e-3 + ) + + +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_2d_gaussian_no_covariance_matches_independent(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + model = spin.GaussianChiEffChiP() + chi_eff, chi_p = xp.meshgrid(xp.linspace(-1, 1, 501), xp.linspace(0, 1, 300)) + data = dict(chi_eff=chi_eff, chi_p=chi_p) + assert ( + xp.max( + xp.abs( spin.gaussian_chi_eff(data, mu_chi_eff=0.6, sigma_chi_eff=0.2) * spin.gaussian_chi_p(data, mu_chi_p=0.4, sigma_chi_p=0.1) - == model( + - model( data, mu_chi_eff=0.6, mu_chi_p=0.4, @@ -197,3 +230,5 @@ def test_2d_gaussian_no_covariance_matches_independent(self): ) ) ) + < 1e-3 + ) diff --git a/test/utils_test.py b/test/utils_test.py index 11e62521..266b930c 100644 --- a/test/utils_test.py +++ b/test/utils_test.py @@ -1,137 +1,121 @@ -import unittest - import numpy as np +import pytest from scipy.stats import vonmises +import gwpopulation from gwpopulation import utils +from . import TEST_BACKENDS + +N_TEST = 100 + + +def test_beta_dist_zero_below_zero(): + equal_zero = True + for _ in range(N_TEST): + vals = utils.beta_dist(-1, *np.random.uniform(0, 10, 3)) + equal_zero = equal_zero & (vals == 0.0) + assert equal_zero + + +def test_beta_dist_zero_above_scale(): + equal_zero = True + for _ in range(N_TEST): + vals = utils.beta_dist(20, *np.random.uniform(0, 10, 3)) + equal_zero = equal_zero & (vals == 0.0) + assert equal_zero + + +def test_beta_dist_alpha_below_zero_raises_value_error(): + with pytest.raises(ValueError): + utils.beta_dist(xx=0.5, alpha=-1, beta=1, scale=1) + + +def test_beta_dist_beta_below_zero_raises_value_error(): + with pytest.raises(ValueError): + utils.beta_dist(xx=0.5, alpha=1, beta=-1, scale=1) + + +def test_powerlaw_zero_below_low(): + equal_zero = True + for ii in range(N_TEST): + vals = utils.powerlaw( + xx=1, + alpha=np.random.uniform(-10, 10), + low=np.random.uniform(5, 15), + high=np.random.uniform(20, 30), + ) + equal_zero = equal_zero & (vals == 0.0) + assert equal_zero + + +def test_powerlaw_zero_above_high(): + equal_zero = True + for ii in range(N_TEST): + vals = utils.powerlaw( + xx=40, + alpha=np.random.uniform(-10, 10), + low=np.random.uniform(5, 15), + high=np.random.uniform(20, 30), + ) + equal_zero = equal_zero & (vals == 0.0) + assert equal_zero + + +def test_powerlaw_low_below_zero_raises_value_error(): + with pytest.raises(ValueError): + utils.powerlaw(xx=0, alpha=3, high=10, low=-4) + + +def test_powerlaw_alpha_equal_zero(): + assert utils.powerlaw(xx=1.0, alpha=-1, low=0.5, high=2) == 1 / np.log(4) -class TestBetaDist(unittest.TestCase): - def setUp(self): - self.n_test = 100 - self.alphas = np.random.uniform(0, 10, self.n_test) - self.betas = np.random.uniform(0, 10, self.n_test) - self.scales = np.random.uniform(0, 10, self.n_test) - pass - - def test_beta_dist_zero_below_zero(self): - equal_zero = True - for ii in range(self.n_test): - vals = utils.beta_dist( - xx=-1, alpha=self.alphas[ii], beta=self.betas[ii], scale=self.scales[ii] - ) - equal_zero = equal_zero & (vals == 0.0) - self.assertTrue(equal_zero) - - def test_beta_dist_zero_above_scale(self): - equal_zero = True - for ii in range(self.n_test): - xx = self.scales[ii] + 1 - vals = utils.beta_dist( - xx=xx, alpha=self.alphas[ii], beta=self.betas[ii], scale=self.scales[ii] - ) - equal_zero = equal_zero & (vals == 0.0) - self.assertTrue(equal_zero) - - def test_beta_dist_alpha_below_zero_raises_value_error(self): - with self.assertRaises(ValueError): - utils.beta_dist(xx=0.5, alpha=-1, beta=1, scale=1) - - def test_beta_dist_beta_below_zero_raises_value_error(self): - with self.assertRaises(ValueError): - utils.beta_dist(xx=0.5, alpha=1, beta=-1, scale=1) - - -class TestPowerLaw(unittest.TestCase): - def setUp(self): - self.n_test = 100 - self.alphas = np.random.uniform(-10, 10, self.n_test) - self.lows = np.random.uniform(5, 15, self.n_test) - self.highs = np.random.uniform(20, 30, self.n_test) - - def test_powerlaw_zero_below_low(self): - equal_zero = True - for ii in range(self.n_test): - xx = self.lows[ii] - 1 - vals = utils.powerlaw( - xx=xx, alpha=self.alphas[ii], low=self.lows[ii], high=self.highs[ii] - ) - equal_zero = equal_zero & (vals == 0.0) - self.assertTrue(equal_zero) - - def test_powerlaw_zero_above_high(self): - equal_zero = True - for ii in range(self.n_test): - xx = self.highs[ii] + 1 - vals = utils.powerlaw( - xx=xx, alpha=self.alphas[ii], low=self.lows[ii], high=self.highs[ii] - ) - equal_zero = equal_zero & (vals == 0.0) - self.assertTrue(equal_zero) - - def test_powerlaw_low_below_zero_raises_value_error(self): - with self.assertRaises(ValueError): - utils.powerlaw(xx=0, alpha=3, high=10, low=-4) - - def test_powerlaw_alpha_equal_zero(self): - self.assertEqual( - utils.powerlaw(xx=1.0, alpha=-1, low=0.5, high=2), 1 / np.log(4) + +def test_truncnorm_zero_below_low(): + equal_zero = True + for _ in range(N_TEST): + vals = utils.truncnorm( + -40, + mu=np.random.uniform(-10, 10), + sigma=np.random.uniform(0, 10), + low=np.random.uniform(-30, -20), + high=np.random.uniform(20, 30), + ) + equal_zero = equal_zero & (vals == 0.0) + assert equal_zero + + +def test_truncnorm_zero_above_high(): + equal_zero = True + for _ in range(N_TEST): + vals = utils.truncnorm( + 40, + mu=np.random.uniform(-10, 10), + sigma=np.random.uniform(0, 10), + low=np.random.uniform(-30, -20), + high=np.random.uniform(20, 30), ) + equal_zero = equal_zero & (vals == 0.0) + assert equal_zero + + +def test_truncnorm_sigma_below_zero_raises_value_error(): + with pytest.raises(ValueError): + utils.truncnorm(xx=0, mu=0, sigma=-1, high=10, low=-10) + + +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_matches_scipy(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + xx = xp.linspace(0, 2 * np.pi, 1000) + for ii in range(N_TEST): + mu = np.random.uniform(-np.pi, np.pi) + kappa = np.random.uniform(0, 15) + gwpop_vals = utils.to_numpy(utils.von_mises(xx, mu, kappa)) + scipy_vals = vonmises(kappa=kappa, loc=mu).pdf(utils.to_numpy(xx)) + assert max(abs(gwpop_vals - scipy_vals)) < 1e-3 -class TestTruncNorm(unittest.TestCase): - def setUp(self): - self.n_test = 100 - self.mus = np.random.uniform(-10, 10, self.n_test) - self.sigmas = np.random.uniform(0, 10, self.n_test) - self.lows = np.random.uniform(-30, -20, self.n_test) - self.highs = np.random.uniform(20, 30, self.n_test) - pass - - def test_truncnorm_zero_below_low(self): - equal_zero = True - for ii in range(self.n_test): - xx = self.lows[ii] - 1 - vals = utils.truncnorm( - xx=xx, - mu=self.mus[ii], - sigma=self.sigmas[ii], - low=self.lows[ii], - high=self.highs[ii], - ) - equal_zero = equal_zero & (vals == 0.0) - self.assertTrue(equal_zero) - - def test_truncnorm_zero_above_high(self): - equal_zero = True - for ii in range(self.n_test): - xx = self.highs[ii] + 1 - vals = utils.truncnorm( - xx=xx, - mu=self.mus[ii], - sigma=self.sigmas[ii], - low=self.lows[ii], - high=self.highs[ii], - ) - equal_zero = equal_zero & (vals == 0.0) - self.assertTrue(equal_zero) - - def test_truncnorm_sigma_below_zero_raises_value_error(self): - with self.assertRaises(ValueError): - utils.truncnorm(xx=0, mu=0, sigma=-1, high=10, low=-10) - - -class TestVonMises(unittest.TestCase): - def setUp(self): - self.n_test = 100 - self.mus = np.random.uniform(-np.pi, np.pi, self.n_test) - self.kappas = np.random.uniform(0, 15, self.n_test) - self.xx = np.linspace(0, 2 * np.pi, 1000) - - def test_matches_scipy(self): - for ii in range(self.n_test): - mu = self.mus[ii] - kappa = self.kappas[ii] - gwpop_vals = utils.von_mises(self.xx, mu, kappa) - scipy_vals = vonmises(kappa=kappa, loc=mu).pdf(self.xx) - self.assertAlmostEqual(max(abs(gwpop_vals - scipy_vals)), 0) +def test_get_version(): + assert gwpopulation.__version__ == utils.get_version_information() diff --git a/test/vt_test.py b/test/vt_test.py index 9b26bb16..1bf00d62 100644 --- a/test/vt_test.py +++ b/test/vt_test.py @@ -1,81 +1,110 @@ -import unittest - +import numpy as np +import pytest from bilby.hyper.model import Model +import gwpopulation from gwpopulation import vt -from gwpopulation.cupy_utils import xp from gwpopulation.models.redshift import PowerLawRedshift, total_four_volume +from . import TEST_BACKENDS + +xp = np + def dummy_function(dataset, alpha): return 1 -class TestBase(unittest.TestCase): - def setUp(self) -> None: - pass +def test_initialize_with_function(): + evaluator = vt._BaseVT(model=dummy_function, data=dict()) + assert evaluator.model.models == [dummy_function] - def test_initialize_with_function(self): - evaluator = vt._BaseVT(model=dummy_function, data=dict()) - self.assertTrue(evaluator.model.models == [dummy_function]) - def test_initialize_with_list_of_functions(self): - evaluator = vt._BaseVT(model=[dummy_function, dummy_function], data=dict()) - self.assertTrue(evaluator.model.models == [dummy_function, dummy_function]) +def test_initialize_with_list_of_functions(): + evaluator = vt._BaseVT(model=[dummy_function, dummy_function], data=dict()) + assert evaluator.model.models == [dummy_function, dummy_function] - def test_initialize_with_bilby_model(self): - model = Model([dummy_function, dummy_function]) - evaluator = vt._BaseVT(model=model, data=dict()) - self.assertTrue(evaluator.model.models == [dummy_function, dummy_function]) - def test_base_cannot_be_called(self): - model = Model([dummy_function, dummy_function]) - evaluator = vt._BaseVT(model=model, data=dict()) - with self.assertRaises(NotImplementedError): - evaluator() +def test_initialize_with_bilby_model(): + model = Model([dummy_function, dummy_function]) + evaluator = vt._BaseVT(model=model, data=dict()) + assert evaluator.model.models == [dummy_function, dummy_function] -class TestGridVT(unittest.TestCase): - def setUp(self): - model = lambda dataset: xp.ones_like(dataset["a"]) - data = dict(a=xp.linspace(0, 1, 1000), vt=2) - self.n_test = 100 - self.vt = vt.GridVT(model, data) +def test_base_cannot_be_called(): + model = Model([dummy_function, dummy_function]) + evaluator = vt._BaseVT(model=model, data=dict()) + with pytest.raises(NotImplementedError): + evaluator() - def test_vt_correct(self): - self.assertEqual(self.vt(dict()), 2) +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_grid_vt_correct(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + model = lambda dataset: xp.ones_like(dataset["a"]) + data = dict(a=xp.linspace(0, 1, 1000), vt=2) + assert float(vt.GridVT(model, data)(dict())) == 2 -class TestResamplingVT(unittest.TestCase): - def setUp(self) -> None: - model = ( - lambda dataset: xp.exp(-((dataset["a"] - 0.5) ** 2) / 2) - / (2 * xp.pi) ** 0.5 - ) - self.data = dict(a=xp.linspace(0, 1, 1000), prior=xp.ones(1000), vt=2) - self.vt = vt.ResamplingVT(data=self.data, model=model, n_events=0) - def test_marginalized_vt_correct(self): - self.assertEqual(self.vt.vt_factor(dict()), 0.38289325179141254) +def resampling_data(xp): + return dict(a=xp.linspace(0, 1, 1000), prior=xp.ones(1000), vt=2) - def test_vt_correct(self): - self.assertEqual(self.vt(dict())[0], 0.38289403358409585) - def test_returns_inf_when_n_effective_too_small(self): - self.vt.n_events = xp.inf - self.assertEqual(self.vt.vt_factor(dict()), xp.inf) +def get_vt(xp): + data = resampling_data(xp) + model = lambda dataset: ( + xp.exp(-((dataset["a"] - 0.5) ** 2) / 2) / (2 * xp.pi) ** 0.5 + ) + evaluator = vt.ResamplingVT(data=data, model=model, n_events=0) + return evaluator + + +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_marginalized_vt_correct(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + assert abs(float(get_vt(xp).vt_factor(dict())) - 0.38289403358409585) < 1e-6 + + +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_resampling_vt_correct(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + assert abs(float(get_vt(xp)(dict())[0]) - 0.38289325179141254) < 1e-6 + + +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_returns_inf_when_n_effective_too_small(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + evaluator = get_vt(xp) + evaluator.marginalize_uncertainty = True + evaluator.n_events = xp.inf + assert evaluator(dict()) == xp.inf + + +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_observed_volume_with_no_redshift_model(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + evaluator = get_vt(xp) + assert evaluator.surveyed_hypervolume(dict()) == total_four_volume( + lamb=0, analysis_time=1 + ) - def test_observed_volume_with_no_redshift_model(self): - self.assertEqual( - self.vt.surveyed_hypervolume(dict()), - total_four_volume(lamb=0, analysis_time=1), - ) - def test_observed_volume_with_redshift_model(self): - model = PowerLawRedshift() - self.vt = vt.ResamplingVT(data=self.data, model=model, n_events=0) - self.assertAlmostEqual( - self.vt.surveyed_hypervolume(dict(lamb=4)), - total_four_volume(lamb=4, analysis_time=1), - 4, +@pytest.mark.parametrize("backend", TEST_BACKENDS) +def test_observed_volume_with_redshift_model(backend): + gwpopulation.set_backend(backend) + xp = gwpopulation.utils.xp + data = resampling_data(xp) + model = PowerLawRedshift() + evaluator = vt.ResamplingVT(data=data, model=model, n_events=0) + assert ( + abs( + evaluator.surveyed_hypervolume(dict(lamb=4)) + - total_four_volume(lamb=4, analysis_time=1) ) + < 1e-4 + )