Skip to content

Commit

Permalink
Documentation, bugfixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Michal Kahle committed Jun 30, 2023
1 parent 75550f5 commit a3a62cb
Showing 1 changed file with 141 additions and 38 deletions.
179 changes: 141 additions & 38 deletions statistikem.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,69 @@
FONTSIZE = 10
ALPHA = 0.05

def compare(var_list, grouping=None, data=None, plot=True, scales=None, sort=None, **kwa):
if type(var_list) == str:
var_list = [var_list]
if type(scales) == str:
scales = [scales] * len(var_list)
elif type(scales) == list:
scales = scales + [None] * (len(var_list - len(scales)))
def compare(variables, grouping=None, data=None, plot=True, scale=None,
sort=None, parametric=None, multi_test_corr='bonferroni', **kwa):
"""Perform univariate analysis of one or more variables
Parameters
----------
variables : str, list of str
Name or list of names of DataFrame columns to be analyzed.
grouping : str
Name of column that will be used for grouping of observations.
data : DataFrame
A DataFrame containing all analyzed variables and grouping variable.
plot : bool, default=True
Plot a graphical summary.
summary : bool, default=False
Include summary of the groups in output. Useful for Table 1 creation.
scale : {'binary', 'categorical', 'continuous'}, optional
Scale of the variable (for plots). If not given it is guessed from data.
parametric : bool, optional
Force parametric or non-parametric tests. By default it is decided
by the Shapiro-Wilk test of normality.
multi_test_corr : str, default='bonferroni'
Method of multiple testing correction of the p-values. Possibilities
include: 'bonferroni', 'sidak', 'holm-sidak', 'holm' and other. Please
see documentation to `statsmodels.stats.multitest.multipletests`.
Returns
-------
DataFrame with results of the comparisons containing columns:
formula : R-style description of the model
scale : scale of the variable
test : the statistical test used
p : the p-value
If the `summary` parameter is true, all groups and their summaries are
included. This is useful for Table 1 creation of clinical trials.
"""
if type(variables) == str:
variables = [variables]
if hasattr(scale, '__iter__') and type(scale) != str:
scale = list(scale) + [None] * (len(variables) - len(scale))
else:
scales = [None] * len(var_list)
scale = [scale] * len(variables)
if hasattr(parametric, '__iter__') and type(parametric) != str:
parametric = list(parametric) + [None] * (len(variables) - len(parametric))
else:
parametric = [parametric] * len(variables)
ll = []
orig_mow = plt.rcParams['figure.max_open_warning'] = 0
for var, scale in zip(var_list, scales):
for var, sc, par in zip(variables, scale, parametric):
if var != grouping:
res = compare_one(var, grouping, data=data, plot=plot, scale=scale, **kwa)
res = compare_one(var, grouping, data=data, plot=plot, scale=sc,
parametric=par, **kwa)
ll.append(res)
plt.rcParams['figure.max_open_warning'] = orig_mow
res_df = pd.DataFrame(ll)
p_corr = sm.stats.multipletests(res_df['p'], method=multi_test_corr)[1]
res_df['p_corr'] = p_corr
if sort:
res_df = res_df.sort_values(sort)
return res_df

def compare_one(var, grouping=None, data=None, plot=True, scale=None, parametric=None, **kwa):
def compare_one(var, grouping=None, data=None, plot=True, summary=False,
scale=None, parametric=None, **kwa):
"""Describe and compare one grouped variable
Parameters
Expand All @@ -51,6 +92,8 @@ def compare_one(var, grouping=None, data=None, plot=True, scale=None, parametric
containing these columns must be given.
plot : bool, default=True
Plot graphical summary.
summary : bool, default=False
Include summary of the groups in output. Useful for Table 1 creation.
scale : {'binary', 'categorical', 'continuous'}, optional
Scale of the variable (for plots). If not given it is guessed from data.
parametric : bool, optional
Expand All @@ -65,6 +108,8 @@ def compare_one(var, grouping=None, data=None, plot=True, scale=None, parametric
scale : scale of the variable
test : the statistical test used
p : the p-value
If the `summary` parameter is true, all groups and their summaries are
included. This is useful for Table 1 creation of clinical trials.
"""
if type(var) == list:
var = data[var]
Expand All @@ -73,7 +118,7 @@ def compare_one(var, grouping=None, data=None, plot=True, scale=None, parametric
if not scale:
scale = _guess_scale(var.values.flatten())
if scale == 'binary':
res = paired_proportion_test(var,plot=plot, scale=scale, **kwa)
res = paired_proportion_test(var, plot=plot, scale=scale, **kwa)
elif scale == 'categorical' or scale == 'continuous':
res = paired_difference_test(var, plot=plot, scale=scale, **kwa)
else:
Expand All @@ -84,16 +129,16 @@ def compare_one(var, grouping=None, data=None, plot=True, scale=None, parametric
grouping = _get_series(grouping, data)
if not scale:
scale = _guess_scale(var)

if scale == 'binary':
res = independent_proportion_test(var, grouping, plot=plot, scale=scale, **kwa)
res = independent_proportion_test(var, grouping, plot=plot, summary=summary, scale=scale, **kwa)
elif scale == 'categorical' or scale == 'continuous':
res = independent_difference_test(var, grouping, plot=plot, scale=scale, parametric=parametric, **kwa)
res = independent_difference_test(var, grouping, plot=plot,
summary=summary, scale=scale, parametric=parametric, **kwa)
else:
raise Exception(f'Unknown scale: {scale}')
return res

def independent_difference_test(var, grouping, plot=True, scale=None, parametric=None, **kwa):
def independent_difference_test(var, grouping, plot=True, summary=False, scale=None, parametric=None, **kwa):
res = {'formula': f'{var.name} ~ {grouping.name}', 'scale': scale, 'test': None, 'p': np.nan}
na_loc, var_nona, grp_nona, g_names, gg, g_missing = _split_to_groups(var, grouping)
n_groups = len(gg)
Expand All @@ -118,7 +163,7 @@ def independent_difference_test(var, grouping, plot=True, scale=None, parametric
distribution = 'normal'
elif np.all(possibly_lognormal):
distribution = 'lognormal'
warnings.warn(f'{var.name}: all groups possibly lognormal. Tests not implemented, yet!')
warnings.warn(f'Variable "{var.name}" might have lognormal distribution.')
else:
distribution = None
if parametric is None:
Expand Down Expand Up @@ -153,7 +198,8 @@ def independent_difference_test(var, grouping, plot=True, scale=None, parametric
tests_style.append(['', 'fc_pink' if p_t < ALPHA else ''])

# Mann-Whitney U test
s_mw, p_mw = stats.mannwhitneyu(gg[0], gg[1], alternative='two-sided', use_continuity=scale=='categorical')
s_mw, p_mw = stats.mannwhitneyu(gg[0], gg[1], alternative='two-sided',
use_continuity=scale=='categorical')
tests.append(['Mann-Whitney', p_mw])
tests_style.append(['', 'fc_pink' if p_mw < ALPHA else ''])

Expand All @@ -176,7 +222,15 @@ def independent_difference_test(var, grouping, plot=True, scale=None, parametric
res['test'], res['p'] = 'ANOVA', p_a
else:
res['test'], res['p'] = 'Kruskal-Wallis', p_kw


if summary:
for g, g_name in zip(gg, g_names):
if parametric:
res[g_name] = format_float(np.mean(g)) + ' ±' + format_float(np.std(g, ddof=1))
else:
p25, p50, p75 = np.percentile(g, [25, 50, 75], interpolation='midpoint')
res[g_name] = f'{format_float(p50)} ({format_float(p25)}, {format_float(p75)})'

if plot:
table_0 = [
[None] + list(g_names) + ['total'],
Expand Down Expand Up @@ -267,17 +321,17 @@ def paired_difference_test(measurements, plot=True, scale=None, parametric=None,
tests.append(['paired t', p_t])
tests_style.append(['', 'fc_pink' if p_t < ALPHA else ''])

# Wilcoxon rank-sum test
s_rs, p_rs = stats.ranksums(*mm_nona_list, alternative='two-sided')
tests.append(['rank-sum', p_rs])
# Wilcoxon signed-rank test
s_rs, p_rs = stats.wilcoxon(*mm_nona_list, alternative='two-sided')
tests.append(['signed-rank', p_rs])
tests_style.append(['', 'fc_pink' if p_rs < ALPHA else ''])
if parametric is None:
parametric = distribution == 'normal'

if parametric:
res['test'], res['p'] = 'paired t', p_t
else:
res['test'], res['p'] = 'rank-sum', p_rs
res['test'], res['p'] = 'signed-rank', p_rs
else:
res['test'], res['p'] = 'ANOVA', None
warnings.warn(f'ANOVA not implemented, yet!')
Expand Down Expand Up @@ -327,7 +381,7 @@ def paired_difference_test(measurements, plot=True, scale=None, parametric=None,



def independent_proportion_test(var, grouping, plot=True, scale=None, **kwa):
def independent_proportion_test(var, grouping, plot=True, summary=False, scale=None, **kwa):
res = {'formula': f'{var.name} ~ {grouping.name}', 'scale': scale, 'test': None, 'p': np.nan}
na_loc, var_nona, grp_nona, g_names, gg, g_missing = _split_to_groups(var, grouping)
n_groups = len(gg)
Expand All @@ -350,7 +404,9 @@ def independent_proportion_test(var, grouping, plot=True, scale=None, **kwa):
tests.append([r'$\chi^2$ Yates', p])
tests_style.append([chi2_valid, 'fc_pink' if p < ALPHA else ''])

if counts.shape == (2, 2):
if counts.shape[1] < 2:
pass
elif counts.shape == (2, 2):
test = 'Fisher exact'
oddsratio, p = stats.fisher_exact(counts, alternative='two-sided')
res['test'], res['p'] = test, p
Expand All @@ -364,17 +420,27 @@ def independent_proportion_test(var, grouping, plot=True, scale=None, **kwa):
res['test'], res['p'] = test, p
tests.append([test, p])
tests_style.append(['', 'fc_pink' if p < ALPHA else ''])
else:
# There is just single value. Let's add complementary binary value.
val = counts.iloc[0].name
complementary = {'0':1, '1':0, 'True':False, 'False':True}.get(str(val))
if complementary is not None:
tcounts = counts.T
tcounts[complementary] = 0
counts = tcounts.T.sort_index()
if summary:
for g_name, count in counts.iteritems():
res[str(g_name)] = f'{count.iloc[-1]} ({count.iloc[-1] / count.sum() * 100:2.0f}%)'
if plot:
table_0 = [[''] + list(g_names) + ['total']]
style_0 = [[None] + [f'fc_C{x}' for x in range(counts.shape[1])] + ['normal']]
sums = counts.sum()
total = sums.sum()
warn = lambda x: 'fc_pink' if x < 5 else ''
perc = lambda x: f'{x} ({x / total * 100:2.0f}%)'
for val, row in counts.iterrows():
table_0.append([val] + [perc(x) for x in row] + [perc(row.sum())])
table_0.append([val] + [_perc(x, col_total) for x, col_total in zip(row, sums)] + [_perc(row.sum(), total)])
style_0.append([''] + ['right ' + warn(x) for x in row] + ['right'])
table_0.append(['total'] + [perc(x) for x in sums] + [perc(total)])
table_0.append(['total'] + [_perc(x, total) for x in sums] + [_perc(total, total)])
style_0.append([''] + ['right' for x in sums] + ['right'])
table_0.append(['missing'] + [x for x in g_missing] + [sum(g_missing)])
style_0.append([''] + ['right' for x in g_missing] + ['right'])
Expand All @@ -398,8 +464,8 @@ def independent_proportion_test(var, grouping, plot=True, scale=None, **kwa):
plt.show()
return res

# def _perc(x, arr):
# return f'{x} ({x / total * 100:2.0f}%)'
def _perc(x, total):
return f'{x} ({x / total * 100:2.0f}%)'


r_stats = None
Expand Down Expand Up @@ -472,7 +538,7 @@ def plot_table(cells, style=None, global_style=None, colWidths=None,
text = cell
ckw['loc'] = 'left'
elif isinstance(cell, float):
text = f'{cell:#.2f}' if 1 <= cell < 10000 else f'{cell:#.2g}'
text = format_float(cell)
ckw['loc'] = 'right'
elif isinstance(cell, (int, np.integer)):
text = str(cell)
Expand Down Expand Up @@ -642,12 +708,49 @@ def ci_mean_lognormal(x, level=0.95):
'min' : np.exp(ln_mean - hci),
'max' : np.exp(ln_mean + hci)}

def format_p(p):
if p is None:
def format_p(p, style='NEJM'):
"""
According to NEJM statistical guidelines for authors (A.1.g):
In general, P values larger than 0.01 should be reported to two decimal
places, and those between 0.01 and 0.001 to three decimal places;
P values smaller than 0.001 should be reported as P<0.001.
Notable exceptions to this policy include P values arising from tests
associated with stopping rules in clinical trials or from genomewide
association studies.
"""
if hasattr(p, '__iter__') and type(p) != str:
return [format_p(p_i) for p_i in p]
if pd.isna(p):
return ''
elif p == 1.0:
return '1.0'
elif p < 0.001:
return '<0.001'
if p > 1.0:
raise ValueError(f'P value cannot be > 1.0. Received {p:.2f}.')
if style == 'NEJM':
if p == 1.0: return '1.0'
elif p > 0.01: return f'{p:.2f}'
elif p > 0.001: return f'{p:.3f}'
else: return '<0.001'
else:
if p == 1.0:
return '1.0'
elif p < 0.001:
return '<0.001'
else:
return f'{p:.{2 if p > 0.2 else 3}f}'

def format_float(x, precision=2):
if 1 <= x < 10000:
return f'{x:#.{precision}f}'
else:
return f'{x:#.{precision}g}'

def stars(p):
if hasattr(p, '__iter__') and type(p) != str:
return [stars(p_i) for p_i in p]
if p < 0.001:
return '***'
elif p < 0.01:
return '**'
elif p < 0.05:
return '*'
else:
return f'{p:.{2 if p > 0.2 else 3}f}'#.lstrip('0')
return ''

0 comments on commit a3a62cb

Please sign in to comment.