Skip to content

Commit

Permalink
feat(missings): added missings mvp (#3)
Browse files Browse the repository at this point in the history
Features:
- High occurrence of nulls
- Missing values correlations
- Missing values predictor
- Performance drop
- Store / get warning (core)
  • Loading branch information
UrbanoFonseca authored Jul 8, 2021
1 parent b0a3556 commit 6a95c37
Show file tree
Hide file tree
Showing 8 changed files with 696 additions and 3 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pandas==1.2.*
pydantic==1.8.2
scikit-learn==0.24.2
20 changes: 20 additions & 0 deletions src/ydata_quality/core/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
Implementation of abstract class for Data Quality engines.
"""
from abc import ABC
from typing import Optional

import pandas as pd
from ydata_quality.core import QualityWarning
from ydata_quality.core.warnings import Priority


class QualityEngine(ABC):
"Main class for running and storing data quality analysis."
Expand All @@ -22,6 +27,21 @@ def warnings(self):
"Storage of all detected data quality warnings."
return self._warnings

def store_warning(self, warning: QualityWarning):
"Adds a new warning to the internal 'warnings' storage."
self._warnings.add(warning)

def get_warnings(self,
category: Optional[str] = None,
test: Optional[str] = None,
priority: Optional[Priority] = None):
"Retrieves warnings filtered by their properties."
filtered = self.warnings # original set
filtered = [w for w in filtered if w.category == category] if category else filtered
filtered = [w for w in filtered if w.test == test] if test else filtered
filtered = [w for w in filtered if w.priority == Priority(priority)] if priority else filtered
return set(filtered)

@property
def tests(self):
"List of individual tests available for the data quality checks."
Expand Down
6 changes: 3 additions & 3 deletions src/ydata_quality/core/warnings.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ def __str__(self):
class QualityWarning(BaseModel):
""" Details for issues detected during data quality analysis.
category: name of the test suite (e.g. 'Exact Duplicates')
test: name of the individual test
category: name of the test suite (e.g. 'Duplicates')
test: name of the individual test (e.g. 'Exact Duplicates')
description: long-text description of the results
priority: expected impact of data quality issue
priority: expected impact of data quality warning
data: sample data
"""

Expand Down
8 changes: 8 additions & 0 deletions src/ydata_quality/missings/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""
Tools to guarantee data quality on missing values.
"""
from ydata_quality.missings.engine import MissingsProfiler

__all__ = [
"MissingsProfiler"
]
180 changes: 180 additions & 0 deletions src/ydata_quality/missings/engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
"""
Implementation of MissingProfiler engine to run missing value analysis.
"""
from typing import List, Optional, Union

import numpy as np
import pandas as pd

from ydata_quality.core import QualityEngine, QualityWarning
from ydata_quality.utils.modelling import (baseline_performance,
performance_per_missing_value,
predict_missingness)


class MissingsProfiler(QualityEngine):
"Main class to run missing value analysis."

def __init__(self, df: pd.DataFrame, target: Optional[str] = None):
"""
Args:
df (pd.DataFrame): reference DataFrame used to run the missing value analysis.
target (str, optional): target
"""
super().__init__(df=df)
self._target = target
self._tests = ["nulls_higher_than", "high_missing_correlations", "predict_missings"]

@property
def target(self):
return self._target

@target.setter
def target(self, target: str):
if target not in self.df.columns:
raise Exception(f'Provided target ({target}) must belong to the dataframe columns ({list(self.df.columns)}).')
self._target = target

def _get_null_cols(self, col: Optional[str] = None) -> List[str]:
"Returns list of given column or all columns with null values in DataFrame if None."
return list(self.df.columns[self.null_count(minimal=False)>0]) if col is None \
else col if isinstance(col, list) \
else [col]

def __get_prediction_type(self):
"Decide whether to use classification or regression setting, based on target."
# TODO: Improve prediction type guesstimate based on alternative heuristics (e.g. dtypes, value_counts)
if len(set(self.df[self.target])) == 2: # binary classification
return 'classification'
else:
return 'regression'

def null_count(self, col: Union[List[str], str, None] = None, normalize=False, minimal=True):
"""Returns the count of null values.
Args:
col (optional, str): name of column to calculate nulls. If none, calculates for full dataframe.
normalize (bool): flag to return nulls as proportion of total rows. Defaults to False.
minimal (bool): flag to drop zero-nulls when computed for all columns.
"""
# if col is not provided, calculate for full dataset
count = self.df.isnull().sum() if col is None else self.df[col].isnull().sum()
# if normalize, return as percentage of total rows
count = count / len(self.df) if normalize else count
# subset
if col is None and minimal:
count = count[count>0]
return count

def nulls_higher_than(self, th=0.2):
"Returns the list of columns with higher missing value percentage than the defined threshold."
ratios = self.null_count(col=None, normalize=True)
high_ratios = ratios[ratios >= th]
if len(high_ratios) > 0:
self.store_warning(
QualityWarning(
test='High Missings', category='Missings', priority=3, data=high_ratios,
description=f"Found {len(high_ratios)} columns with more than {th*100:.1f}% of missing values."
)
)
else:
high_ratios = None
return high_ratios

def missing_correlations(self):
"""Calculate the correlations between missing values in feature values.
# TODO: Replace standard correlation coefficient by Cramer's V / Theil's U.
"""
nulls = self.df.loc[:, self.null_count(minimal=False) > 0] # drop columns without nulls
return nulls.isnull().corr()

def high_missing_correlations(self, th: float = 0.5):
"Returns a list of correlation pairs with high correlation of missing values."

corrs = self.missing_correlations().abs() # compute the absolute correlation
np.fill_diagonal(corrs.values, -1) # remove the same column pairs
corrs = corrs[corrs>th].melt(ignore_index=False).reset_index().dropna() # subset by threshold

# TODO: For acyclical correlation measures (e.g. Theil's U), store direction as well

# create the sorted pairs of feature names
corrs['features'] = ['_'.join(sorted((i.index, i.variable))) for i in corrs.itertuples()]
corrs.drop_duplicates('features', inplace=True) # deduplicate combination pairs
corrs.sort_values(by='value', ascending=False, inplace=True) # sort by correlation
corrs = corrs.set_index('features').rename(columns={'value': 'missings_corr'})[['missings_corr']].squeeze() # rename and subset columns

if len(corrs) > 0:
self.store_warning(
QualityWarning(
test='High Missing Correlations', category='Missings', priority=3, data=corrs,
description=f"Found {len(corrs)} feature pairs with correlation "\
f"of missing values higher than defined threshold ({th})."
)
)
return corrs

def performance_drop(self, col: Union[List[str], str, None] = None, normalize=True):
"""Calculate the drop in performance when the feature values of a given column are missing.
Performance is measured by "AU-ROC" for binary classification and "Mean Squared Error" for regression.
Args:
col (Union[List[str], str, None], optional): reference for comparing performances between valued and missing value instances.
If None, calculates performance_drop for all columns with missing values.
normalize (bool): performance as ratio over baseline performance achieved for entire dataset.
"""
# Parse the columns for which to calculate the drop in performance on missings
cols = self._get_null_cols(col)

# Guarantee that target is defined. Otherwise skip
if self.target is None:
print('Argument "target" must be defined to calculate performance_drop metric. Skipping test.')
pass

# Guesstimate the prediction type
prediction_type = self.__get_prediction_type()
results = pd.DataFrame({
c: performance_per_missing_value(df=self.df, feature=c, target=self.target, type=prediction_type)
for c in cols
})

# Normalize the results with a baseline performance.
if normalize:
baseline = baseline_performance(df=self.df, target=self.target, type=prediction_type)
results = results / baseline

return results

def predict_missings(self, col: Union[List[str], str, None] = None, th=0.8):
"""Calculates the performance score of a baseline model trained to predict missingness of a specific feature.
Performance is measured on "AU-ROC" for a binary classifier trained to predict occurrence of missing values.
High performances signal that the occurrence of missing values for a specific feature may be impacted by the
feature values of all the remaining features.
Args:
col (Union[List[str], str, None], optional): reference for predicting occurrence of missing values.
If None, calculates predict_missings for all columns with missing values.
th (float): performance threshold to generate a QualityWarning.
"""
# Parse the columns for which to calculate the missingness performance
cols = self._get_null_cols(col)
# Calculate the performance for each feature
results = pd.Series(
{c: predict_missingness(df=self.df, feature=c) for c in cols},
name='predict_missings'
)

# Subset for performances above threshold
high_perfs = results[results > th]

# Generate a QualityWarning if any high
if len(high_perfs) > 0:
self.store_warning(
QualityWarning(
test='Missingness Prediction', category='Missings', priority=2, data=high_perfs,
description=f'Found {len(high_perfs)} features with prediction performance of missingness above threshold ({th}).'
)
)
return results
Empty file.
116 changes: 116 additions & 0 deletions src/ydata_quality/utils/modelling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""
Utilities based on building baseline machine learning models.
"""

import pandas as pd
from sklearn.exceptions import ConvergenceWarning
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.metrics import mean_squared_error, roc_auc_score
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.utils._testing import ignore_warnings

BASELINE_CLASSIFIER = Pipeline([
('imputer', SimpleImputer()),
('classifier', LogisticRegression())
])

BASELINE_REGRESSION = Pipeline([
('imputer', SimpleImputer()),
('classifier', LinearRegression())
])


@ignore_warnings(category=ConvergenceWarning)
def baseline_predictions(df: pd.DataFrame, target: str, type='classification'):
"Train a baseline model and predict for a test set"

# 1. Define the baseline model
model = BASELINE_CLASSIFIER if type == 'classification' else BASELINE_REGRESSION

# 2. Train overall model
X, y = df.drop(target, axis=1), df[target]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
model.fit(X_train.select_dtypes('number'), y_train)

# 3. Predict
if type == 'regression':
y_pred = model.predict(X_test.select_dtypes('number'))
elif type == 'classification':
y_pred = model.predict_proba(X_test.select_dtypes('number'))[:, 1]

# 4. Return both the predictions and X_test, y_test to analyze the performances
return y_pred, X_test, y_test

def baseline_performance(df: pd.DataFrame, target: str, type='classification'):
"Train a baseline model, predict for a test set and return the performance."

# 1. Define the baseline performance metric
metric = roc_auc_score if type == 'classification' else mean_squared_error

# 2. Get the baseline predictions
y_pred, _, y_test = baseline_predictions(df=df, target=target, type=type)

# 3. Get the performance
return metric(y_test, y_pred)


def performance_per_feature_values(df: pd.DataFrame, feature: str, target: str, type='classification'):
"""Performance achieved per each value of a groupby feature."""

# 1. Define the baseline performance metric
metric = roc_auc_score if type == 'classification' else mean_squared_error

# 2. Get the baseline predictions
y_pred, X_test, y_test = baseline_predictions(df=df, target=target, type=type)

# 3. Get the performances per feature value
uniques = set(X_test[feature])
results = {}
for i in uniques: # for each category
y_pred_cat = y_pred[X_test[feature]==i]
y_true_cat = y_test[X_test[feature]==i]
results[i] = metric(y_true_cat, y_pred_cat)
return results

def performance_per_missing_value(df: pd.DataFrame, feature: str, target: str, type='classification'):
"""Performance difference between valued and missing values in feature."""

# 1. Define the baseline performance metric
metric = roc_auc_score if type == 'classification' else mean_squared_error

# 2. Get the baseline predictions
y_pred, X_test, y_test = baseline_predictions(df=df, target=target, type=type)

# 3. Get the performance per valued vs missing feature
missing_mask = X_test[feature].isna()
results = {}
results['missing'] = metric(y_test[missing_mask], y_pred[missing_mask])
results['valued'] = metric(y_test[~missing_mask], y_pred[~missing_mask])
return results

@ignore_warnings(category=ConvergenceWarning)
def predict_missingness(df: pd.DataFrame, feature: str):
"Train a baseline model to predict the missingness of a feature value."
# 0. Preprocessing
df = df.copy() # avoid altering the original DataFrame
target = f'is_missing_{feature}'

# 1. Define the baseline model
model = BASELINE_CLASSIFIER

# 2. Create the new target
df[target] = df[feature].isna()

# 3. Train overall model
X, y = df.drop([feature, target], axis=1), df[target]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
model.fit(X_train.select_dtypes('number'), y_train)

# 4. Predict
y_pred = model.predict_proba(X_test.select_dtypes('number'))[:, 1]

# 5. Return the area under the roc curve
return roc_auc_score(y_test, y_pred)

Loading

0 comments on commit 6a95c37

Please sign in to comment.