Skip to content

Commit

Permalink
Replace ZeroInflated distributions with Mixtures
Browse files Browse the repository at this point in the history
  • Loading branch information
ricardoV94 committed Mar 11, 2022
1 parent 27fd32f commit 9771af3
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 387 deletions.
342 changes: 61 additions & 281 deletions pymc/distributions/discrete.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@
normal_lcdf,
)
from pymc.distributions.distribution import Discrete
from pymc.distributions.logprob import logcdf, logp
from pymc.distributions.logprob import logp
from pymc.distributions.mixture import Mixture
from pymc.distributions.shape_utils import rv_size_is_none
from pymc.math import sigmoid
from pymc.vartypes import continuous_types
Expand Down Expand Up @@ -1384,22 +1385,7 @@ def logcdf(value, c):
)


class ZeroInflatedPoissonRV(RandomVariable):
name = "zero_inflated_poisson"
ndim_supp = 0
ndims_params = [0, 0]
dtype = "int64"
_print_name = ("ZeroInflatedPois", "\\operatorname{ZeroInflatedPois}")

@classmethod
def rng_fn(cls, rng, psi, lam, size):
return rng.poisson(lam, size=size) * (rng.random(size=size) < psi)


zero_inflated_poisson = ZeroInflatedPoissonRV()


class ZeroInflatedPoisson(Discrete):
class ZeroInflatedPoisson:
R"""
Zero-inflated Poisson log-likelihood.
Expand Down Expand Up @@ -1450,97 +1436,32 @@ class ZeroInflatedPoisson(Discrete):
(theta >= 0).
"""

rv_op = zero_inflated_poisson

@classmethod
def dist(cls, psi, theta, *args, **kwargs):
def __new__(cls, name, psi, theta, **kwargs):
psi = at.as_tensor_variable(floatX(psi))
theta = at.as_tensor_variable(floatX(theta))
return super().dist([psi, theta], *args, **kwargs)

def get_moment(rv, size, psi, theta):
mean = at.floor(psi * theta)
if not rv_size_is_none(size):
mean = at.full(size, mean)
return mean

def logp(value, psi, theta):
r"""
Calculate log-probability of ZeroInflatedPoisson distribution at specified value.
Parameters
----------
value: numeric
Value(s) for which log-probability is calculated. If the log probabilities for multiple
values are desired the values must be provided in a numpy array or Aesara tensor
Returns
-------
TensorVariable
"""

res = at.switch(
at.gt(value, 0),
at.log(psi) + logp(Poisson.dist(mu=theta), value),
at.logaddexp(at.log1p(-psi), at.log(psi) - theta),
)

res = at.switch(at.lt(value, 0), -np.inf, res)

return check_parameters(
res,
0 <= psi,
psi <= 1,
0 <= theta,
msg="0 <= psi <= 1, theta >= 0",
)

def logcdf(value, psi, theta):
"""
Compute the log of the cumulative distribution function for ZeroInflatedPoisson distribution
at the specified value.
Parameters
----------
value: numeric or np.ndarray or aesara.tensor
Value(s) for which log CDF is calculated. If the log CDF for multiple
values are desired the values must be provided in a numpy array or Aesara tensor.
Returns
-------
TensorVariable
"""

res = at.switch(
at.lt(value, 0),
-np.inf,
at.logaddexp(
at.log1p(-psi),
at.log(psi) + logcdf(Poisson.dist(mu=theta), value),
),
)

return check_parameters(
res, 0 <= psi, psi <= 1, 0 <= theta, msg="0 <= psi <= 1, theta >= 0"
return Mixture(
name,
w=at.stack([1 - psi, psi], axis=-1),
comp_dists=[
Constant.dist(0),
Poisson.dist(mu=theta),
],
**kwargs,
)


class ZeroInflatedBinomialRV(RandomVariable):
name = "zero_inflated_binomial"
ndim_supp = 0
ndims_params = [0, 0, 0]
dtype = "int64"
_print_name = ("ZeroInflatedBinom", "\\operatorname{ZeroInflatedBinom}")

@classmethod
def rng_fn(cls, rng, psi, n, p, size):
return rng.binomial(n=n, p=p, size=size) * (rng.random(size=size) < psi)


zero_inflated_binomial = ZeroInflatedBinomialRV()
def dist(cls, psi, theta, **kwargs):
psi = at.as_tensor_variable(floatX(psi))
return Mixture.dist(
w=at.stack([1 - psi, psi], axis=-1),
comp_dists=[
Constant.dist(0),
Poisson.dist(mu=theta),
],
**kwargs,
)


class ZeroInflatedBinomial(Discrete):
class ZeroInflatedBinomial:
R"""
Zero-inflated Binomial log-likelihood.
Expand Down Expand Up @@ -1592,107 +1513,29 @@ class ZeroInflatedBinomial(Discrete):
"""

rv_op = zero_inflated_binomial

@classmethod
def dist(cls, psi, n, p, *args, **kwargs):
def __new__(cls, name, psi, n, p, **kwargs):
psi = at.as_tensor_variable(floatX(psi))
n = at.as_tensor_variable(intX(n))
p = at.as_tensor_variable(floatX(p))
return super().dist([psi, n, p], *args, **kwargs)

def get_moment(rv, size, psi, n, p):
mean = at.round(psi * n * p)
if not rv_size_is_none(size):
mean = at.full(size, mean)
return mean

def logp(value, psi, n, p):
r"""
Calculate log-probability of ZeroInflatedBinomial distribution at specified value.
Parameters
----------
value: numeric
Value(s) for which log-probability is calculated. If the log probabilities for multiple
values are desired the values must be provided in a numpy array or Aesara tensor
Returns
-------
TensorVariable
"""

res = at.switch(
at.gt(value, 0),
at.log(psi) + logp(Binomial.dist(n=n, p=p), value),
at.logaddexp(at.log1p(-psi), at.log(psi) + n * at.log1p(-p)),
)

res = at.switch(
at.lt(value, 0),
-np.inf,
res,
)

return check_parameters(
res,
0 <= psi,
psi <= 1,
0 <= p,
p <= 1,
msg="0 <= psi <= 1, 0 <= p <= 1",
)

def logcdf(value, psi, n, p):
"""
Compute the log of the cumulative distribution function for ZeroInflatedBinomial distribution
at the specified value.
Parameters
----------
value: numeric or np.ndarray or aesara.tensor
Value(s) for which log CDF is calculated. If the log CDF for multiple
values are desired the values must be provided in a numpy array or Aesara tensor.
Returns
-------
TensorVariable
"""
res = at.switch(
at.or_(at.lt(value, 0), at.gt(value, n)),
-np.inf,
at.logaddexp(
at.log1p(-psi),
at.log(psi) + logcdf(Binomial.dist(n=n, p=p), value),
),
)

return check_parameters(
res,
0 <= psi,
psi <= 1,
0 <= p,
p <= 1,
msg="0 <= psi <= 1, 0 <= p <= 1",
return Mixture(
name,
w=at.stack([1 - psi, psi], axis=-1),
comp_dists=[
Constant.dist(0),
Binomial.dist(n=n, p=p),
],
**kwargs,
)


class ZeroInflatedNegBinomialRV(RandomVariable):
name = "zero_inflated_neg_binomial"
ndim_supp = 0
ndims_params = [0, 0, 0]
dtype = "int64"
_print_name = (
"ZeroInflatedNegBinom",
"\\operatorname{ZeroInflatedNegBinom}",
)

@classmethod
def rng_fn(cls, rng, psi, n, p, size):
return rng.negative_binomial(n=n, p=p, size=size) * (rng.random(size=size) < psi)


zero_inflated_neg_binomial = ZeroInflatedNegBinomialRV()
def dist(cls, psi, n, p, **kwargs):
psi = at.as_tensor_variable(floatX(psi))
return Mixture.dist(
w=at.stack([1 - psi, psi], axis=-1),
comp_dists=[
Constant.dist(0),
Binomial.dist(n=n, p=p),
],
**kwargs,
)


class ZeroInflatedNegativeBinomial(Discrete):
Expand Down Expand Up @@ -1776,91 +1619,28 @@ def ZeroInfNegBinom(a, m, psi, x):
Alternative number of target success trials (n > 0)
"""

rv_op = zero_inflated_neg_binomial

@classmethod
def dist(cls, psi, mu=None, alpha=None, p=None, n=None, *args, **kwargs):
def __new__(cls, name, psi, mu=None, alpha=None, p=None, n=None, **kwargs):
psi = at.as_tensor_variable(floatX(psi))
n, p = NegativeBinomial.get_n_p(mu=mu, alpha=alpha, p=p, n=n)
n = at.as_tensor_variable(floatX(n))
p = at.as_tensor_variable(floatX(p))
return super().dist([psi, n, p], *args, **kwargs)

def get_moment(rv, size, psi, n, p):
mean = at.floor(psi * n * (1 - p) / p)
if not rv_size_is_none(size):
mean = at.full(size, mean)
return mean

def logp(value, psi, n, p):
r"""
Calculate log-probability of ZeroInflatedNegativeBinomial distribution at specified value.
Parameters
----------
value: numeric
Value(s) for which log-probability is calculated. If the log probabilities for multiple
values are desired the values must be provided in a numpy array or Aesara tensor
Returns
-------
TensorVariable
"""

res = at.switch(
at.gt(value, 0),
at.log(psi) + logp(NegativeBinomial.dist(n=n, p=p), value),
at.logaddexp(at.log1p(-psi), at.log(psi) + n * at.log(p)),
)

res = at.switch(
at.lt(value, 0),
-np.inf,
res,
)

return check_parameters(
res,
0 <= psi,
psi <= 1,
0 < n,
0 <= p,
p <= 1,
msg="0 <= psi <= 1, n > 0, 0 <= p <= 1",
)

def logcdf(value, psi, n, p):
"""
Compute the log of the cumulative distribution function for ZeroInflatedNegativeBinomial distribution
at the specified value.
Parameters
----------
value: numeric or np.ndarray or aesara.tensor
Value(s) for which log CDF is calculated. If the log CDF for multiple
values are desired the values must be provided in a numpy array or Aesara tensor.
Returns
-------
TensorVariable
"""
res = at.switch(
at.lt(value, 0),
-np.inf,
at.logaddexp(
at.log1p(-psi),
at.log(psi) + logcdf(NegativeBinomial.dist(n=n, p=p), value),
),
return Mixture(
name,
w=at.stack([1 - psi, psi], axis=-1),
comp_dists=[
Constant.dist(0),
NegativeBinomial.dist(mu=mu, alpha=alpha, p=p, n=n),
],
**kwargs,
)

return check_parameters(
res,
0 <= psi,
psi <= 1,
0 < n,
0 < p,
p <= 1,
msg="0 <= psi <= 1, n > 0, 0 < p <= 1",
@classmethod
def dist(cls, psi, mu=None, alpha=None, p=None, n=None, **kwargs):
psi = at.as_tensor_variable(floatX(psi))
return Mixture.dist(
w=at.stack([1 - psi, psi], axis=-1),
comp_dists=[
Constant.dist(0),
NegativeBinomial.dist(mu=mu, alpha=alpha, p=p, n=n),
],
**kwargs,
)


Expand Down
Loading

0 comments on commit 9771af3

Please sign in to comment.