Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow MCMC transforms for MultipleIndependentConstraints prior #619

Merged
merged 3 commits into from
Jan 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions sbi/utils/sbiutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,23 +506,34 @@ def mcmc_transform(
try:
_ = prior.support
has_support = True
except NotImplementedError:
except NotImplementedError or AttributeError:
# NotImplementedError -> Distribution that inherits from torch dist but
# does not implement support.
# AttributeError -> Custom distribution that has no support attribute.
warnings.warn(
"""The passed prior has no support property, transform will be
constructed from mean and std. If the passed prior is supposed to be
bounded consider implementing the prior.support property."""
)
has_support = False

# TODO: implement case for half-bounded priors, e.g., LogNormal.
# If the distribution has a `support`, check if the support is bounded.
# If it is not bounded, we want to z-score the space. This is not done
# by `biject_to()`, so we have to deal with this case separately.
if has_support:
if hasattr(prior.support, "base_constraint"):
constraint = prior.support.base_constraint
else:
constraint = prior.support
if isinstance(constraint, constraints._Real):
support_is_bounded = False
else:
support_is_bounded = True
else:
support_is_bounded = False

# Prior with bounded support, e.g., uniform priors.
if has_support and (
isinstance(prior.support, constraints.interval) or
# support can be hidden in independent constraints (e.g., for BoxUniform,
# or custom prior with bounds).
(isinstance(prior.support, constraints._IndependentConstraint)
and isinstance(prior.support.base_constraint, constraints.interval))
):
if has_support and support_is_bounded:
transform = biject_to(prior.support)
# For all other cases build affine transform with mean and std.
else:
Expand Down
2 changes: 1 addition & 1 deletion sbi/utils/user_input_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ def check_prior_support(prior):
"""

try:
within_support(prior, prior.sample())
within_support(prior, prior.sample((1,)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With a MultipleIndependent distribution support.check(sample) requires a batched sample

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

except NotImplementedError:
raise NotImplementedError(
"""The prior must implement the support property or allow to call
Expand Down
66 changes: 19 additions & 47 deletions sbi/utils/user_input_checks_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from scipy.stats._multivariate import multi_rv_frozen
from torch import Tensor, float32
from torch.distributions import Distribution, constraints
from torch.distributions import Distribution, Independent, biject_to, constraints


class CustomPriorWrapper(Distribution):
Expand Down Expand Up @@ -324,56 +325,27 @@ def variance(self) -> Tensor:

@property
def support(self):
# return independent constraints for each distribution.
return MultipleIndependentConstraints(
multiple_constraints=[d.support for d in self.dists],
dims_per_constraint=self.dims_per_dist,
# First, we remove all `independent` constraints. This applies to e.g.
# `MultivariateNormal`. An `independent` constraint returns a 1D `[True]`
# when `.support.check(sample)` is called, whereas distributions that are
# not `independent` (e.g. `Gamma`), return a 2D `[[True]]`. When such
# constraints would be combined with the `constraint.cat(..., dim=1)`, it
# fails because the `independent` constraint returned only a 1D `[True]`.
supports = []
for d in self.dists:
if isinstance(d.support, constraints.independent):
supports.append(d.support.base_constraint)
else:
supports.append(d.support)

# Wrap as `independent` in order to have the correct shape of the
# `log_abs_det`, i.e. summed over the parameter dimensions.
return constraints.independent(
constraints.cat(supports, dim=1, lengths=self.dims_per_dist),
reinterpreted_batch_ndims=1,
)


class MultipleIndependentConstraints(constraints.Constraint):
def __init__(
self,
multiple_constraints: Sequence[constraints.Constraint],
dims_per_constraint: list,
):
"""Define multiple independent constraints to check support of independent
joint distributions.

Args:
constraints: List of constraints, one for each distribution.
dims_per_constraint: List of event dimensions for each distribution, needed
to match values to constraints.
"""

for c in multiple_constraints:
assert isinstance(c, constraints.Constraint)
self.constraints = multiple_constraints
self.dims_per_constraint = dims_per_constraint

def check(self, value: Tensor) -> Tensor:
"""Returns a byte tensor of ``sample_shape + batch_shape`` indicating
whether each event in value satisfies its corresponding constraint."""

if value.ndim < 2:
value = value.unsqueeze(0)
result = torch.zeros((value.shape[0], len(self.constraints)))
dim_idx = 0
# For each constraint, select the corresponding values according to its
# event dim.
for idx, c in enumerate(self.constraints):
# reshape and squeeze needed to accommodate different types of
# distributions.
result[:, idx] = c.check(
value[:, dim_idx : (dim_idx + self.dims_per_constraint[idx])].reshape(
-1, self.dims_per_constraint[idx]
)
).squeeze()
dim_idx += self.dims_per_constraint[idx]
# Return check across all independent constraints.
return result.all(-1)


def build_support(
lower_bound: Optional[Tensor] = None, upper_bound: Optional[Tensor] = None
) -> constraints.Constraint:
Expand Down
37 changes: 1 addition & 36 deletions tests/sbiutils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@
import pytest
import torch
from torch import Tensor, eye, ones, zeros
from torch.distributions import Exponential, LogNormal, MultivariateNormal
from torch.distributions import MultivariateNormal
from torch.distributions.transforms import IndependentTransform, identity_transform

from sbi.inference import SNPE, SNPE_A
from sbi.inference.snpe.snpe_a import SNPE_A_MDN
from sbi.utils import (
BoxUniform,
MultipleIndependent,
get_kde,
mcmc_transform,
posterior_nn,
Expand Down Expand Up @@ -363,40 +362,6 @@ def log_prob(self, theta):
plt.show()


@pytest.mark.parametrize(
"prior, enable_transform",
(
(BoxUniform(zeros(5), ones(5)), True),
(BoxUniform(zeros(1), ones(1)), True),
(BoxUniform(zeros(5), ones(5)), False),
(MultivariateNormal(zeros(5), eye(5)), True),
(Exponential(rate=ones(1)), True),
(LogNormal(zeros(1), ones(1)), True),
(
MultipleIndependent(
[Exponential(rate=ones(1)), BoxUniform(zeros(5), ones(5))]
),
True,
),
),
)
def test_mcmc_transform(prior, enable_transform):
"""
Test whether the transform for MCMC returns the log_abs_det in the correct shape.
"""

num_samples = 1000
prior, _, _ = process_prior(prior)
tf = mcmc_transform(prior, enable_transform=enable_transform)

samples = prior.sample((num_samples,))
unconstrained_samples = tf(samples)
samples_original = tf.inv(unconstrained_samples)

log_abs_det = tf.log_abs_det_jacobian(samples_original, unconstrained_samples)
assert log_abs_det.shape == torch.Size([num_samples])


@pytest.mark.parametrize(
"transform",
(
Expand Down
84 changes: 84 additions & 0 deletions tests/transforms_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import pytest
import torch
from torch import ones, zeros, eye
from torch.distributions import Uniform, MultivariateNormal, LogNormal, Exponential
from torch.distributions.transforms import (
AffineTransform,
ComposeTransform,
ExpTransform,
IndependentTransform,
SigmoidTransform,
)

from sbi.utils import BoxUniform, mcmc_transform, process_prior, MultipleIndependent
from tests.user_input_checks_test import UserNumpyUniform


@pytest.mark.parametrize(
"prior, target_transform",
(
(Uniform(-torch.ones(1), torch.ones(1)), SigmoidTransform),
(BoxUniform(-torch.ones(2), torch.ones(2)), SigmoidTransform),
(UserNumpyUniform(torch.zeros(2), torch.ones(2)), SigmoidTransform),
(MultivariateNormal(torch.zeros(2), torch.eye(2)), AffineTransform),
(LogNormal(loc=torch.zeros(1), scale=torch.ones(1)), ExpTransform),
),
)
def test_transforms(prior, target_transform):

if isinstance(prior, UserNumpyUniform):
prior, *_ = process_prior(
prior,
dict(lower_bound=torch.zeros(2), upper_bound=torch.ones(2)),
)

transform = mcmc_transform(prior)
core_transform = transform._inv

if isinstance(core_transform, IndependentTransform):
core_transform = core_transform.base_transform

if hasattr(core_transform, "parts"):
transform_to_inspect = core_transform.parts[0]
else:
transform_to_inspect = core_transform

assert isinstance(transform_to_inspect, target_transform)

samples = prior.sample((2,))
transformed_samples = transform(samples)
assert torch.allclose(samples, transform.inv(transformed_samples))


@pytest.mark.parametrize(
"prior, enable_transform",
(
(BoxUniform(zeros(5), ones(5)), True),
(BoxUniform(zeros(1), ones(1)), True),
(BoxUniform(zeros(5), ones(5)), False),
(MultivariateNormal(zeros(5), eye(5)), True),
(Exponential(rate=ones(1)), True),
(LogNormal(zeros(1), ones(1)), True),
(
MultipleIndependent(
[Exponential(rate=ones(1)), BoxUniform(zeros(5), ones(5))]
),
True,
),
),
)
def test_mcmc_transform(prior, enable_transform):
"""
Test whether the transform for MCMC returns the log_abs_det in the correct shape.
"""

num_samples = 1000
prior, _, _ = process_prior(prior)
tf = mcmc_transform(prior, enable_transform=enable_transform)

samples = prior.sample((num_samples,))
unconstrained_samples = tf(samples)
samples_original = tf.inv(unconstrained_samples)

log_abs_det = tf.log_abs_det_jacobian(samples_original, unconstrained_samples)
assert log_abs_det.shape == torch.Size([num_samples])
3 changes: 2 additions & 1 deletion tests/user_input_checks_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from sbi.inference.posteriors.mcmc_posterior import MCMCPosterior
import torch
from pyknos.mdn.mdn import MultivariateGaussianMDN
from scipy.stats import beta, multivariate_normal, uniform
from scipy.stats import beta, multivariate_normal, uniform, lognorm
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could move this test to transform_tests.py, or copy it to keep both.

from torch import Tensor, eye, nn, ones, zeros
from torch.distributions import Beta, Distribution, Gamma, MultivariateNormal, Uniform

Expand Down Expand Up @@ -94,6 +94,7 @@ def matrix_simulator(theta):
dict(lower_bound=zeros(3), upper_bound=ones(3)),
),
(ScipyPytorchWrapper, multivariate_normal(), dict()),
(ScipyPytorchWrapper, lognorm(s=1.0), dict()),
(
ScipyPytorchWrapper,
uniform(),
Expand Down