Skip to content

Commit

Permalink
Add a unit test for P-value of jackpotted samples
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewfallan committed Aug 9, 2024
1 parent d41661a commit 7cad76c
Showing 1 changed file with 62 additions and 27 deletions.
89 changes: 62 additions & 27 deletions src/seismicrna/cluster/tests/em_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,42 +397,77 @@ def test_mixed_sum_exp_over_min_exp_less_than_sum_obs(self):
self.assertEqual(df, 4)


def sim_reads(n: int, mus: np.ndarray):
return rng.random((n, get_length(mus))) < mus


def count_uniq_reads(reads: np.ndarray):
uniq_reads, num_obs = np.unique(reads, axis=0, return_counts=True)
return uniq_reads, num_obs


def calc_log_exp(n: int, mus: np.ndarray, uniq_reads: np.ndarray):
return np.log(n) + np.sum(np.where(uniq_reads,
np.log(mus),
np.log(1. - mus)),
axis=1)


def sim_obs_exp_mixture(n: int,
mus1: np.ndarray,
mus2: np.ndarray,
pi2: float):
""" Simulate observed and expected counts from a mixture of two
sets of mutation rates. """
n2 = round(n * pi2)
n1 = n - n2
reads = np.vstack([sim_reads(n1, mus1),
sim_reads(n2, mus2)])
uniq_reads, num_obs = count_uniq_reads(reads)
# Calculate the expected counts using the average of mus1 and mus2.
# The discrepancy between this average and the actual underlying
# structure of the simulated reads is what produces the appearance
# of jackpotting.
mus = (n1 / n) * mus1 + (n2 / n) * mus2
log_exp = calc_log_exp(n, mus, uniq_reads)
return num_obs, log_exp


class TestJackpottingPValue(ut.TestCase):

@staticmethod
def sim_obs_exp(num_reads: int, p_mut: np.ndarray):
""" Simulate observed and expected counts. """
reads = rng.random((num_reads, get_length(p_mut))) < p_mut
uniq_reads, num_obs = np.unique(reads, axis=0, return_counts=True)
log_exp = np.log(num_reads) + np.sum(np.where(uniq_reads,
np.log(p_mut),
np.log(1. - p_mut)),
axis=1)
return num_obs, log_exp

def test_no_jackpotting(self):
def simulate(pi2: float,
n_trials: int = 1024,
n_reads: int = 10000,
read_length: int = 24,
beta_a: float = 2.,
beta_b: float = 98.):
p_values = list()
for _ in range(n_trials):
mus1 = rng.beta(beta_a, beta_b, size=read_length)
mus2 = rng.beta(beta_a, beta_b, size=read_length)
num_obs, log_exp = sim_obs_exp_mixture(n_reads, mus1, mus2, pi2)
g_stat, df = _calc_jackpotting_g_stat(num_obs, log_exp)
p_value = _calc_jackpotting_p_value(g_stat, df)
p_values.append(p_value)
return np.array(p_values)

def test_without_jackpotting(self):
""" Test that the P-value distribution is uniform for samples
with no jackpotting. """
n_trials = 1000
n_reads_per_trial = 10000
read_length = 30
beta_a = 2.
beta_b = 98.
pi2 = 0.0
alpha = 0.01
p_values = np.array([
_calc_jackpotting_p_value(
*_calc_jackpotting_g_stat(
*self.sim_obs_exp(
n_reads_per_trial,
rng.beta(beta_a, beta_b, size=read_length)
)
)
)
for _ in range(n_trials)
])
p_values = self.simulate(pi2)
result = kstest(p_values, "uniform", nan_policy="raise")
self.assertGreaterEqual(result.pvalue, alpha)

def test_with_jackpotting(self):
pi2 = 0.15
alpha = 0.01
p_values = self.simulate(pi2)
result = kstest(p_values, "uniform", nan_policy="raise")
self.assertLess(result.pvalue, alpha)


if __name__ == "__main__":
ut.main()

0 comments on commit 7cad76c

Please sign in to comment.