diff --git a/pymc/distributions/discrete.py b/pymc/distributions/discrete.py index ace21dc25b6..06e06fbf993 100644 --- a/pymc/distributions/discrete.py +++ b/pymc/distributions/discrete.py @@ -43,7 +43,8 @@ normal_lcdf, ) from pymc.distributions.distribution import Discrete -from pymc.distributions.logprob import logcdf, logp +from pymc.distributions.logprob import logp +from pymc.distributions.mixture import Mixture from pymc.distributions.shape_utils import rv_size_is_none from pymc.math import sigmoid from pymc.vartypes import continuous_types @@ -1384,22 +1385,7 @@ def logcdf(value, c): ) -class ZeroInflatedPoissonRV(RandomVariable): - name = "zero_inflated_poisson" - ndim_supp = 0 - ndims_params = [0, 0] - dtype = "int64" - _print_name = ("ZeroInflatedPois", "\\operatorname{ZeroInflatedPois}") - - @classmethod - def rng_fn(cls, rng, psi, lam, size): - return rng.poisson(lam, size=size) * (rng.random(size=size) < psi) - - -zero_inflated_poisson = ZeroInflatedPoissonRV() - - -class ZeroInflatedPoisson(Discrete): +class ZeroInflatedPoisson: R""" Zero-inflated Poisson log-likelihood. @@ -1450,97 +1436,32 @@ class ZeroInflatedPoisson(Discrete): (theta >= 0). """ - rv_op = zero_inflated_poisson - - @classmethod - def dist(cls, psi, theta, *args, **kwargs): + def __new__(cls, name, psi, theta, **kwargs): psi = at.as_tensor_variable(floatX(psi)) - theta = at.as_tensor_variable(floatX(theta)) - return super().dist([psi, theta], *args, **kwargs) - - def get_moment(rv, size, psi, theta): - mean = at.floor(psi * theta) - if not rv_size_is_none(size): - mean = at.full(size, mean) - return mean - - def logp(value, psi, theta): - r""" - Calculate log-probability of ZeroInflatedPoisson distribution at specified value. - - Parameters - ---------- - value: numeric - Value(s) for which log-probability is calculated. If the log probabilities for multiple - values are desired the values must be provided in a numpy array or Aesara tensor - - Returns - ------- - TensorVariable - """ - - res = at.switch( - at.gt(value, 0), - at.log(psi) + logp(Poisson.dist(mu=theta), value), - at.logaddexp(at.log1p(-psi), at.log(psi) - theta), - ) - - res = at.switch(at.lt(value, 0), -np.inf, res) - - return check_parameters( - res, - 0 <= psi, - psi <= 1, - 0 <= theta, - msg="0 <= psi <= 1, theta >= 0", - ) - - def logcdf(value, psi, theta): - """ - Compute the log of the cumulative distribution function for ZeroInflatedPoisson distribution - at the specified value. - - Parameters - ---------- - value: numeric or np.ndarray or aesara.tensor - Value(s) for which log CDF is calculated. If the log CDF for multiple - values are desired the values must be provided in a numpy array or Aesara tensor. - - Returns - ------- - TensorVariable - """ - - res = at.switch( - at.lt(value, 0), - -np.inf, - at.logaddexp( - at.log1p(-psi), - at.log(psi) + logcdf(Poisson.dist(mu=theta), value), - ), - ) - - return check_parameters( - res, 0 <= psi, psi <= 1, 0 <= theta, msg="0 <= psi <= 1, theta >= 0" + return Mixture( + name, + w=at.stack([1 - psi, psi], axis=-1), + comp_dists=[ + Constant.dist(0), + Poisson.dist(mu=theta), + ], + **kwargs, ) - -class ZeroInflatedBinomialRV(RandomVariable): - name = "zero_inflated_binomial" - ndim_supp = 0 - ndims_params = [0, 0, 0] - dtype = "int64" - _print_name = ("ZeroInflatedBinom", "\\operatorname{ZeroInflatedBinom}") - @classmethod - def rng_fn(cls, rng, psi, n, p, size): - return rng.binomial(n=n, p=p, size=size) * (rng.random(size=size) < psi) - - -zero_inflated_binomial = ZeroInflatedBinomialRV() + def dist(cls, psi, theta, **kwargs): + psi = at.as_tensor_variable(floatX(psi)) + return Mixture.dist( + w=at.stack([1 - psi, psi], axis=-1), + comp_dists=[ + Constant.dist(0), + Poisson.dist(mu=theta), + ], + **kwargs, + ) -class ZeroInflatedBinomial(Discrete): +class ZeroInflatedBinomial: R""" Zero-inflated Binomial log-likelihood. @@ -1592,107 +1513,29 @@ class ZeroInflatedBinomial(Discrete): """ - rv_op = zero_inflated_binomial - - @classmethod - def dist(cls, psi, n, p, *args, **kwargs): + def __new__(cls, name, psi, n, p, **kwargs): psi = at.as_tensor_variable(floatX(psi)) - n = at.as_tensor_variable(intX(n)) - p = at.as_tensor_variable(floatX(p)) - return super().dist([psi, n, p], *args, **kwargs) - - def get_moment(rv, size, psi, n, p): - mean = at.round(psi * n * p) - if not rv_size_is_none(size): - mean = at.full(size, mean) - return mean - - def logp(value, psi, n, p): - r""" - Calculate log-probability of ZeroInflatedBinomial distribution at specified value. - - Parameters - ---------- - value: numeric - Value(s) for which log-probability is calculated. If the log probabilities for multiple - values are desired the values must be provided in a numpy array or Aesara tensor - - Returns - ------- - TensorVariable - """ - - res = at.switch( - at.gt(value, 0), - at.log(psi) + logp(Binomial.dist(n=n, p=p), value), - at.logaddexp(at.log1p(-psi), at.log(psi) + n * at.log1p(-p)), - ) - - res = at.switch( - at.lt(value, 0), - -np.inf, - res, - ) - - return check_parameters( - res, - 0 <= psi, - psi <= 1, - 0 <= p, - p <= 1, - msg="0 <= psi <= 1, 0 <= p <= 1", - ) - - def logcdf(value, psi, n, p): - """ - Compute the log of the cumulative distribution function for ZeroInflatedBinomial distribution - at the specified value. - - Parameters - ---------- - value: numeric or np.ndarray or aesara.tensor - Value(s) for which log CDF is calculated. If the log CDF for multiple - values are desired the values must be provided in a numpy array or Aesara tensor. - - Returns - ------- - TensorVariable - """ - res = at.switch( - at.or_(at.lt(value, 0), at.gt(value, n)), - -np.inf, - at.logaddexp( - at.log1p(-psi), - at.log(psi) + logcdf(Binomial.dist(n=n, p=p), value), - ), - ) - - return check_parameters( - res, - 0 <= psi, - psi <= 1, - 0 <= p, - p <= 1, - msg="0 <= psi <= 1, 0 <= p <= 1", + return Mixture( + name, + w=at.stack([1 - psi, psi], axis=-1), + comp_dists=[ + Constant.dist(0), + Binomial.dist(n=n, p=p), + ], + **kwargs, ) - -class ZeroInflatedNegBinomialRV(RandomVariable): - name = "zero_inflated_neg_binomial" - ndim_supp = 0 - ndims_params = [0, 0, 0] - dtype = "int64" - _print_name = ( - "ZeroInflatedNegBinom", - "\\operatorname{ZeroInflatedNegBinom}", - ) - @classmethod - def rng_fn(cls, rng, psi, n, p, size): - return rng.negative_binomial(n=n, p=p, size=size) * (rng.random(size=size) < psi) - - -zero_inflated_neg_binomial = ZeroInflatedNegBinomialRV() + def dist(cls, psi, n, p, **kwargs): + psi = at.as_tensor_variable(floatX(psi)) + return Mixture.dist( + w=at.stack([1 - psi, psi], axis=-1), + comp_dists=[ + Constant.dist(0), + Binomial.dist(n=n, p=p), + ], + **kwargs, + ) class ZeroInflatedNegativeBinomial(Discrete): @@ -1776,91 +1619,28 @@ def ZeroInfNegBinom(a, m, psi, x): Alternative number of target success trials (n > 0) """ - rv_op = zero_inflated_neg_binomial - - @classmethod - def dist(cls, psi, mu=None, alpha=None, p=None, n=None, *args, **kwargs): + def __new__(cls, name, psi, mu=None, alpha=None, p=None, n=None, **kwargs): psi = at.as_tensor_variable(floatX(psi)) - n, p = NegativeBinomial.get_n_p(mu=mu, alpha=alpha, p=p, n=n) - n = at.as_tensor_variable(floatX(n)) - p = at.as_tensor_variable(floatX(p)) - return super().dist([psi, n, p], *args, **kwargs) - - def get_moment(rv, size, psi, n, p): - mean = at.floor(psi * n * (1 - p) / p) - if not rv_size_is_none(size): - mean = at.full(size, mean) - return mean - - def logp(value, psi, n, p): - r""" - Calculate log-probability of ZeroInflatedNegativeBinomial distribution at specified value. - - Parameters - ---------- - value: numeric - Value(s) for which log-probability is calculated. If the log probabilities for multiple - values are desired the values must be provided in a numpy array or Aesara tensor - - Returns - ------- - TensorVariable - """ - - res = at.switch( - at.gt(value, 0), - at.log(psi) + logp(NegativeBinomial.dist(n=n, p=p), value), - at.logaddexp(at.log1p(-psi), at.log(psi) + n * at.log(p)), - ) - - res = at.switch( - at.lt(value, 0), - -np.inf, - res, - ) - - return check_parameters( - res, - 0 <= psi, - psi <= 1, - 0 < n, - 0 <= p, - p <= 1, - msg="0 <= psi <= 1, n > 0, 0 <= p <= 1", - ) - - def logcdf(value, psi, n, p): - """ - Compute the log of the cumulative distribution function for ZeroInflatedNegativeBinomial distribution - at the specified value. - - Parameters - ---------- - value: numeric or np.ndarray or aesara.tensor - Value(s) for which log CDF is calculated. If the log CDF for multiple - values are desired the values must be provided in a numpy array or Aesara tensor. - - Returns - ------- - TensorVariable - """ - res = at.switch( - at.lt(value, 0), - -np.inf, - at.logaddexp( - at.log1p(-psi), - at.log(psi) + logcdf(NegativeBinomial.dist(n=n, p=p), value), - ), + return Mixture( + name, + w=at.stack([1 - psi, psi], axis=-1), + comp_dists=[ + Constant.dist(0), + NegativeBinomial.dist(mu=mu, alpha=alpha, p=p, n=n), + ], + **kwargs, ) - return check_parameters( - res, - 0 <= psi, - psi <= 1, - 0 < n, - 0 < p, - p <= 1, - msg="0 <= psi <= 1, n > 0, 0 < p <= 1", + @classmethod + def dist(cls, psi, mu=None, alpha=None, p=None, n=None, **kwargs): + psi = at.as_tensor_variable(floatX(psi)) + return Mixture.dist( + w=at.stack([1 - psi, psi], axis=-1), + comp_dists=[ + Constant.dist(0), + NegativeBinomial.dist(mu=mu, alpha=alpha, p=p, n=n), + ], + **kwargs, ) diff --git a/pymc/tests/test_distributions.py b/pymc/tests/test_distributions.py index e159d0b0298..9e72447a794 100644 --- a/pymc/tests/test_distributions.py +++ b/pymc/tests/test_distributions.py @@ -934,7 +934,9 @@ def check_selfconsistency_discrete_logcdf( Check that logcdf of discrete distributions matches sum of logps up to value """ # This test only works for scalar random variables - assert distribution.rv_op.ndim_supp == 0 + rv_op = getattr(distribution, "rv_op", None) + if rv_op: + assert rv_op.ndim_supp == 0 domains = paramdomains.copy() domains["value"] = domain diff --git a/pymc/tests/test_distributions_random.py b/pymc/tests/test_distributions_random.py index 14669704f35..6789fd39dbe 100644 --- a/pymc/tests/test_distributions_random.py +++ b/pymc/tests/test_distributions_random.py @@ -25,6 +25,8 @@ from numpy.testing import assert_almost_equal, assert_array_almost_equal +from pymc.vartypes import discrete_types + try: from polyagamma import random_polyagamma @@ -1581,111 +1583,6 @@ def check_dtype(self): assert pm.Constant.dist(2.0).dtype == aesara.config.floatX -class TestZeroInflatedPoisson(BaseTestDistributionRandom): - def zero_inflated_poisson_rng_fn(self, size, psi, theta, poisson_rng_fct, random_rng_fct): - return poisson_rng_fct(theta, size=size) * (random_rng_fct(size=size) < psi) - - def seeded_zero_inflated_poisson_rng_fn(self): - poisson_rng_fct = functools.partial( - getattr(np.random.RandomState, "poisson"), self.get_random_state() - ) - - random_rng_fct = functools.partial( - getattr(np.random.RandomState, "random"), self.get_random_state() - ) - - return functools.partial( - self.zero_inflated_poisson_rng_fn, - poisson_rng_fct=poisson_rng_fct, - random_rng_fct=random_rng_fct, - ) - - pymc_dist = pm.ZeroInflatedPoisson - pymc_dist_params = {"psi": 0.9, "theta": 4.0} - expected_rv_op_params = {"psi": 0.9, "theta": 4.0} - reference_dist_params = {"psi": 0.9, "theta": 4.0} - reference_dist = seeded_zero_inflated_poisson_rng_fn - checks_to_run = [ - "check_pymc_params_match_rv_op", - "check_pymc_draws_match_reference", - "check_rv_size", - ] - - -class TestZeroInflatedBinomial(BaseTestDistributionRandom): - def zero_inflated_binomial_rng_fn(self, size, psi, n, p, binomial_rng_fct, random_rng_fct): - return binomial_rng_fct(n, p, size=size) * (random_rng_fct(size=size) < psi) - - def seeded_zero_inflated_binomial_rng_fn(self): - binomial_rng_fct = functools.partial( - getattr(np.random.RandomState, "binomial"), self.get_random_state() - ) - - random_rng_fct = functools.partial( - getattr(np.random.RandomState, "random"), self.get_random_state() - ) - - return functools.partial( - self.zero_inflated_binomial_rng_fn, - binomial_rng_fct=binomial_rng_fct, - random_rng_fct=random_rng_fct, - ) - - pymc_dist = pm.ZeroInflatedBinomial - pymc_dist_params = {"psi": 0.9, "n": 12, "p": 0.7} - expected_rv_op_params = {"psi": 0.9, "n": 12, "p": 0.7} - reference_dist_params = {"psi": 0.9, "n": 12, "p": 0.7} - reference_dist = seeded_zero_inflated_binomial_rng_fn - checks_to_run = [ - "check_pymc_params_match_rv_op", - "check_pymc_draws_match_reference", - "check_rv_size", - ] - - -class TestZeroInflatedNegativeBinomialMuSigma(BaseTestDistributionRandom): - def zero_inflated_negbinomial_rng_fn( - self, size, psi, n, p, negbinomial_rng_fct, random_rng_fct - ): - return negbinomial_rng_fct(n, p, size=size) * (random_rng_fct(size=size) < psi) - - def seeded_zero_inflated_negbinomial_rng_fn(self): - negbinomial_rng_fct = functools.partial( - getattr(np.random.RandomState, "negative_binomial"), self.get_random_state() - ) - - random_rng_fct = functools.partial( - getattr(np.random.RandomState, "random"), self.get_random_state() - ) - - return functools.partial( - self.zero_inflated_negbinomial_rng_fn, - negbinomial_rng_fct=negbinomial_rng_fct, - random_rng_fct=random_rng_fct, - ) - - n, p = pm.NegativeBinomial.get_n_p(mu=3, alpha=5) - - pymc_dist = pm.ZeroInflatedNegativeBinomial - pymc_dist_params = {"psi": 0.9, "mu": 3, "alpha": 5} - expected_rv_op_params = {"psi": 0.9, "n": n, "p": p} - reference_dist_params = {"psi": 0.9, "n": n, "p": p} - reference_dist = seeded_zero_inflated_negbinomial_rng_fn - checks_to_run = [ - "check_pymc_params_match_rv_op", - "check_pymc_draws_match_reference", - "check_rv_size", - ] - - -class TestZeroInflatedNegativeBinomial(BaseTestDistributionRandom): - pymc_dist = pm.ZeroInflatedNegativeBinomial - pymc_dist_params = {"psi": 0.9, "n": 12, "p": 0.7} - expected_rv_op_params = {"psi": 0.9, "n": 12, "p": 0.7} - reference_dist_params = {"psi": 0.9, "n": 12, "p": 0.7} - checks_to_run = ["check_pymc_params_match_rv_op"] - - class TestOrderedLogistic(BaseTestDistributionRandom): pymc_dist = _OrderedLogistic pymc_dist_params = {"eta": 0, "cutpoints": np.array([-2, 0, 2])} @@ -2501,3 +2398,17 @@ def test_car_rng_fn(sparse): ) f -= 1 assert p > delta + + +@pytest.mark.parametrize( + "dist, non_psi_args", + [ + (pm.ZeroInflatedPoisson.dist, (2,)), + (pm.ZeroInflatedBinomial.dist, (2, 0.5)), + (pm.ZeroInflatedNegativeBinomial.dist, (2, 2)), + ], +) +def test_zero_inflated_dists_dtype_and_broadcast(dist, non_psi_args): + x = dist([0.5, 0.5, 0.5], *non_psi_args) + assert x.dtype in discrete_types + assert x.eval().shape == (3,)