Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Delta Method Analysis #208

Merged
merged 16 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ The library offers the following classes:
* `PairedTTestClusteredAnalysis`: to run a paired t-test on aggregated data for clusters
* `ClusteredOLSAnalysis`: to run OLS analysis on the results of a clustered design
* `OLSAnalysis`: to run OLS analysis for non-clustered data
* `DeltaMethodAnalysis`: to run Delta Method Analysis for clustered designs
* `TargetAggregation`: to add pre-experimental data of the outcome to reduce variance
* `SyntheticControlAnalysis`: to run synthetic control analysis
* Regarding experiment analysis workflow:
Expand Down
2 changes: 2 additions & 0 deletions cluster_experiments/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from cluster_experiments.cupac import EmptyRegressor, TargetAggregation
from cluster_experiments.experiment_analysis import (
ClusteredOLSAnalysis,
DeltaMethodAnalysis,
ExperimentAnalysis,
GeeExperimentAnalysis,
MLMExperimentAnalysis,
Expand Down Expand Up @@ -45,6 +46,7 @@
__all__ = [
"ExperimentAnalysis",
"GeeExperimentAnalysis",
"DeltaMethodAnalysis",
"OLSAnalysis",
"BinaryPerturbator",
"Perturbator",
Expand Down
189 changes: 186 additions & 3 deletions cluster_experiments/experiment_analysis.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import warnings
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, List, Optional
Expand All @@ -7,10 +8,10 @@
import pandas as pd
import statsmodels.api as sm
from pandas.api.types import is_numeric_dtype
from scipy.stats import ttest_ind, ttest_rel
from scipy.stats import norm, ttest_ind, ttest_rel

from cluster_experiments.synthetic_control_utils import get_w
from cluster_experiments.utils import HypothesisEntries
from cluster_experiments.utils import HypothesisEntries, ModelResults


@dataclass
Expand Down Expand Up @@ -1147,7 +1148,8 @@ def pvalue_based_on_hypothesis(

def _get_treatment_cluster(self, df: pd.DataFrame) -> str:
"""Returns the first treatment cluster. The current implementation of Synthetic Control only accepts one treatment cluster.
This will be left inside Synthetic class because it doesn't apply for other analyses"""
This will be left inside Synthetic class because it doesn't apply for other analyses
"""
treatment_df = df[df[self.treatment_col] == 1]
treatment_cluster = self._get_cluster_column(treatment_df).unique()[0]
return treatment_cluster
Expand Down Expand Up @@ -1204,3 +1206,184 @@ def _split_pre_experiment_df(self, df: pd.DataFrame):
pre_experiment_df = df[(df[self.time_col] <= self.intervention_date)]
df = df[(df[self.time_col] > self.intervention_date)]
return df, pre_experiment_df


class DeltaMethodAnalysis(ExperimentAnalysis):
def __init__(
self,
cluster_cols: Optional[List[str]] = None,
target_col: str = "target",
scale_col: str = "scale",
treatment_col: str = "treatment",
treatment: str = "B",
covariates: Optional[List[str]] = None,
LGonzalezGomez marked this conversation as resolved.
Show resolved Hide resolved
hypothesis: str = "two-sided",
):
"""
Class to run the Delta Method approximation for estimating the treatment effect on a ratio metric (target/scale) under a clustered design.
The analysis is done on the aggregated data at the cluster level, making computation more efficient.

Arguments:
cluster_cols: list of columns to use as clusters. Not available for the CUPED method.
target_col: name of the column containing the variable to measure (the numerator of the ratio).
scale_col: name of the column containing the scale variable (the denominator of the ratio).
treatment_col: name of the column containing the treatment variable.
treatment: name of the treatment to use as the treated group.
covariates: list of columns to use as covariates.
ratio_covariates: list of tuples of columns to use as covariates for ratio metrics. First element is the numerator column, second element is the denominator column.
hypothesis: one of "two-sided", "less", "greater" indicating the alternative hypothesis.

Usage:
```python
import pandas as pd

from cluster_experiments.experiment_analysis import DeltaMethodAnalysis

df = pd.DataFrame({
'x': [1, 2, 3, 0, 0, 1] * 2,
'y': [2, 2, 5, 1, 1, 1] * 2,
'treatment': ["A"] * 6 + ["B"] * 6,
'cluster': [1, 2, 3, 1, 2, 3] * 2,
})

DeltaMethodAnalysis(
cluster_cols=['cluster'],
target_col='x',
scale_col='y'
).get_pvalue(df)
```
"""

super().__init__(
target_col=target_col,
treatment_col=treatment_col,
cluster_cols=cluster_cols,
treatment=treatment,
covariates=covariates,
hypothesis=hypothesis,
)
self.scale_col = scale_col
self.cluster_cols = cluster_cols or []

if covariates is not None:
warnings.warn(
"Covariates are not supported in the Delta Method approximation for the time being. They will be ignored."
)
if cluster_cols is None:
raise ValueError(
"cluster_cols must be provided for the Delta Method analysis"
)

def _aggregate_to_cluster(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Returns an aggreegated dataframe of the target and scale variables at the cluster (and treatment) level.

Arguments:
df: dataframe containing the data to analyze
"""
group_cols = self.cluster_cols + [self.treatment_col]
aggregate_df = df.groupby(by=group_cols, as_index=False).agg(
{self.target_col: "sum", self.scale_col: "sum"}
)
return aggregate_df

def _get_group_mean_and_variance(self, df: pd.DataFrame) -> tuple[float, float]:
"""
Returns the mean and variance of the ratio metric (target/scale) as estimated by the delta method for a given group (treatment).

Arguments:
df: dataframe containing the data to analyze.
"""
df = self._aggregate_to_cluster(df)
group_size = len(df)

if group_size < 1000:
self.__warn_small_group_size()

target_mean, scale_mean = df.loc[:, [self.target_col, self.scale_col]].mean()
target_variance, scale_variance = df.loc[
:, [self.target_col, self.scale_col]
].var()
target_sum, scale_sum = df.loc[:, [self.target_col, self.scale_col]].sum()

target_scale_cov = df.loc[:, self.target_col].cov(df.loc[:, self.scale_col])

group_mean = target_sum / scale_sum
group_variance = (
(1 / (scale_mean**2)) * target_variance
+ (target_mean**2) / (scale_mean**4) * scale_variance
- (2 * target_mean) / (scale_mean**3) * target_scale_cov
) / group_size
return group_mean, group_variance

def _get_mean_standard_error(self, df: pd.DataFrame) -> tuple[float, float]:
"""
Returns mean and variance of the ratio metric (target/scale) for a given cluster (i.e. user) computed using the Delta Method.
Variance reduction is used if covariates are given.
"""

is_treatment = df[self.treatment_col] == 1
treat_mean, treat_var = self._get_group_mean_and_variance(df[is_treatment])
ctrl_mean, ctrl_var = self._get_group_mean_and_variance(df[~is_treatment])

mean_diff = treat_mean - ctrl_mean
standard_error = np.sqrt(treat_var + ctrl_var)

return mean_diff, standard_error

def analysis_pvalue(self, df: pd.DataFrame) -> float:
"""
Returns the p-value of the analysis.

Arguments:
df: dataframe containing the data to analyze.
"""

mean_diff, standard_error = self._get_mean_standard_error(df)

z_score = mean_diff / standard_error
p_value = 2 * (1 - norm.cdf(abs(z_score)))

results_delta = ModelResults(
params={self.treatment_col: mean_diff},
pvalues={self.treatment_col: p_value},
)

p_value = self.pvalue_based_on_hypothesis(results_delta)

return p_value

def analysis_point_estimate(self, df: pd.DataFrame) -> float:
"""Returns the point estimate of the analysis
Arguments:
df: dataframe containing the data to analyze
verbose (Optional): bool, prints the regression summary if True
"""
mean_diff, _standard_error = self._get_mean_standard_error(df)
return mean_diff

def analysis_standard_error(self, df: pd.DataFrame) -> float:
"""Returns the standard error of the analysis
Arguments:
df: dataframe containing the data to analyze
verbose (Optional): bool, prints the regression summary if True
"""
_mean_diff, standard_error = self._get_mean_standard_error(df)
return standard_error

@classmethod
def from_config(cls, config):
"""Creates a DeltaMethodAnalysis object from a PowerConfig object"""
return cls(
cluster_cols=config.cluster_cols,
target_col=config.target_col,
scale_col=config.scale_col,
treatment_col=config.treatment_col,
treatment=config.treatment,
hypothesis=config.hypothesis,
)

def __warn_small_group_size(self):
warnings.warn(
"Delta Method approximation may not be accurate for small group sizes"
)
18 changes: 18 additions & 0 deletions cluster_experiments/power_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from cluster_experiments.cupac import EmptyRegressor, TargetAggregation
from cluster_experiments.experiment_analysis import (
ClusteredOLSAnalysis,
DeltaMethodAnalysis,
GeeExperimentAnalysis,
MLMExperimentAnalysis,
OLSAnalysis,
Expand Down Expand Up @@ -38,6 +39,10 @@ class MissingArgumentError(ValueError):
pass


class UnexpectedArgumentError(ValueError):
pass


@dataclass(eq=True)
class PowerConfig:
"""
Expand Down Expand Up @@ -105,6 +110,7 @@ class PowerConfig:

# optional mappings
cupac_model: str = ""
scale_col: Optional[str] = None

# Shared
target_col: str = "target"
Expand Down Expand Up @@ -196,6 +202,10 @@ def __post_init__(self):
if "segmented" in self.perturbator:
self._raise_error_if_missing("segment_cols", "perturbator")

if "delta" not in self.analysis:
if self.scale_col is not None:
self._raise_error_if_missing("scale_col", "analysis")

def _are_different(self, arg1, arg2) -> bool:
return arg1 != arg2

Expand All @@ -214,6 +224,13 @@ def _raise_error_if_missing(self, attr, other_attr):
f"{other_attr} = {getattr(self, other_attr)}."
)

def _raise_error_if_present(self, attr, other_attr):
if getattr(self, attr) is None:
raise UnexpectedArgumentError(
f"{attr} is not expected when using "
f"{other_attr} = {getattr(self, other_attr)}."
)


perturbator_mapping = {
"binary": BinaryPerturbator,
Expand Down Expand Up @@ -254,6 +271,7 @@ def _raise_error_if_missing(self, attr, other_attr):
"ttest_clustered": TTestClusteredAnalysis,
"paired_ttest_clustered": PairedTTestClusteredAnalysis,
"mlm": MLMExperimentAnalysis,
"delta": DeltaMethodAnalysis,
LGonzalezGomez marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, you're adding it here but not adding the scale parameter in here. I think we need to add it if we want to init via config. I would also test init via config, I think it's missing

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what you mean here. I get that it might be tricky to know that you have to pass an additional parameter or that the "scale" column has to appear somewhere. Is this what you are refering to?
If that is the case, do you have any recommendation on how to go around this? I am not sure I understand how to do it properly...

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needs to work like this:

config = {
    "analysis": "delta",
    "perturbator": "constant",
    "splitter": "non_clustered",
    "n_simulations": 50,
    "scale_col": "y",
    "target_col": "x"
}
pw = PowerAnalysis.from_dict(config)
power = pw.power_analysis(df, average_effect=0.1)

I think we need to pass the scale_col in the from_config. Please have a read at the from_config in PowerAnalysis, ExperimentAnalysis, etc and if you don't know how it's working let me know

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was pretty clear, thanks!
I think that now it should be working. I had to do some changes and raise a couple of additional errors to save guard.
When time allows, could you check again and give more feedback?
Thanks!

}

cupac_model_mapping = {"": EmptyRegressor, "mean_cupac_model": TargetAggregation}
7 changes: 7 additions & 0 deletions cluster_experiments/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from enum import Enum
from typing import Dict


def _original_time_column(time_col: str) -> str:
Expand Down Expand Up @@ -26,3 +27,9 @@ class HypothesisEntries(Enum):
TWO_SIDED = "two-sided"
LESS = "less"
GREATER = "greater"


class ModelResults:
def __init__(self, params: Dict, pvalues: Dict):
self.params = params
self.pvalues = pvalues
Loading
Loading