Skip to content

Commit

Permalink
feat: add detect_anomalies to ml ARIMAPlus and KMeans models (#426)
Browse files Browse the repository at this point in the history
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/python-bigquery-dataframes/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> 🦕
  • Loading branch information
GarrettWu authored Mar 12, 2024
1 parent 8d82945 commit 6df28ed
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 13 deletions.
28 changes: 28 additions & 0 deletions bigframes/ml/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,34 @@ def predict(

return self._bqml_model.predict(X)

def detect_anomalies(
self, X: Union[bpd.DataFrame, bpd.Series], *, contamination: float = 0.1
) -> bpd.DataFrame:
"""Detect the anomaly data points of the input.
Args:
X (bigframes.dataframe.DataFrame or bigframes.series.Series):
Series or a DataFrame to detect anomalies.
contamination (float, default 0.1):
Identifies the proportion of anomalies in the training dataset that are used to create the model.
The value must be in the range [0, 0.5].
Returns:
bigframes.dataframe.DataFrame: detected DataFrame."""
if contamination < 0.0 or contamination > 0.5:
raise ValueError(
f"contamination must be [0.0, 0.5], but is {contamination}."
)

if not self._bqml_model:
raise RuntimeError("A model must be fitted before detect_anomalies")

(X,) = utils.convert_to_dataframe(X)

return self._bqml_model.detect_anomalies(
X, options={"contamination": contamination}
)

def to_gbq(self, model_name: str, replace: bool = False) -> KMeans:
"""Save the model to BigQuery.
Expand Down
2 changes: 1 addition & 1 deletion bigframes/ml/decomposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def predict(self, X: Union[bpd.DataFrame, bpd.Series]) -> bpd.DataFrame:
return self._bqml_model.predict(X)

def detect_anomalies(
self, X: Union[bpd.DataFrame, bpd.Series], *, contamination=0.1
self, X: Union[bpd.DataFrame, bpd.Series], *, contamination: float = 0.1
) -> bpd.DataFrame:
"""Detect the anomaly data points of the input.
Expand Down
30 changes: 30 additions & 0 deletions bigframes/ml/forecasting.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,36 @@ def predict(
options={"horizon": horizon, "confidence_level": confidence_level}
)

def detect_anomalies(
self,
X: Union[bpd.DataFrame, bpd.Series],
*,
anomaly_prob_threshold: float = 0.95,
) -> bpd.DataFrame:
"""Detect the anomaly data points of the input.
Args:
X (bigframes.dataframe.DataFrame or bigframes.series.Series):
Series or a DataFrame to detect anomalies.
anomaly_prob_threshold (float, default 0.95):
Identifies the custom threshold to use for anomaly detection. The value must be in the range [0, 1), with a default value of 0.95.
Returns:
bigframes.dataframe.DataFrame: detected DataFrame."""
if anomaly_prob_threshold < 0.0 or anomaly_prob_threshold >= 1.0:
raise ValueError(
f"anomaly_prob_threshold must be [0.0, 1.0), but is {anomaly_prob_threshold}."
)

if not self._bqml_model:
raise RuntimeError("A model must be fitted before detect_anomalies")

(X,) = utils.convert_to_dataframe(X)

return self._bqml_model.detect_anomalies(
X, options={"anomaly_prob_threshold": anomaly_prob_threshold}
)

def score(
self,
X: Union[bpd.DataFrame, bpd.Series],
Expand Down
45 changes: 45 additions & 0 deletions tests/system/small/ml/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import pandas as pd

from bigframes.ml import cluster
import bigframes.pandas as bpd
from tests.system.utils import assert_pandas_df_equal

_PD_NEW_PENGUINS = pd.DataFrame.from_dict(
Expand Down Expand Up @@ -73,6 +74,50 @@ def test_kmeans_predict(session, penguins_kmeans_model: cluster.KMeans):
assert_pandas_df_equal(result, expected, ignore_order=True)


def test_kmeans_detect_anomalies(
penguins_kmeans_model: cluster.KMeans, new_penguins_df: bpd.DataFrame
):
anomalies = penguins_kmeans_model.detect_anomalies(new_penguins_df).to_pandas()
expected = pd.DataFrame(
{
"is_anomaly": [False, False, False],
"normalized_distance": [1.082937, 0.77139, 0.478304],
},
index=pd.Index([1633, 1672, 1690], name="tag_number", dtype="Int64"),
)

pd.testing.assert_frame_equal(
anomalies[["is_anomaly", "normalized_distance"]].sort_index(),
expected,
check_exact=False,
check_dtype=False,
rtol=0.1,
)


def test_kmeans_detect_anomalies_params(
penguins_kmeans_model: cluster.KMeans, new_penguins_df: bpd.DataFrame
):
anomalies = penguins_kmeans_model.detect_anomalies(
new_penguins_df, contamination=0.4
).to_pandas()
expected = pd.DataFrame(
{
"is_anomaly": [True, False, False],
"normalized_distance": [1.082937, 0.77139, 0.478304],
},
index=pd.Index([1633, 1672, 1690], name="tag_number", dtype="Int64"),
)

pd.testing.assert_frame_equal(
anomalies[["is_anomaly", "normalized_distance"]].sort_index(),
expected,
check_exact=False,
check_dtype=False,
rtol=0.1,
)


def test_kmeans_score(session, penguins_kmeans_model: cluster.KMeans):
new_penguins = session.read_pandas(_PD_NEW_PENGUINS)
result = penguins_kmeans_model.score(new_penguins).to_pandas()
Expand Down
23 changes: 23 additions & 0 deletions tests/system/small/ml/test_decomposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,29 @@ def test_pca_detect_anomalies(
)


def test_pca_detect_anomalies_params(
penguins_pca_model: decomposition.PCA, new_penguins_df: bpd.DataFrame
):
anomalies = penguins_pca_model.detect_anomalies(
new_penguins_df, contamination=0.2
).to_pandas()
expected = pd.DataFrame(
{
"is_anomaly": [False, True, True],
"mean_squared_error": [0.254188, 0.731243, 0.298889],
},
index=pd.Index([1633, 1672, 1690], name="tag_number", dtype="Int64"),
)

pd.testing.assert_frame_equal(
anomalies[["is_anomaly", "mean_squared_error"]].sort_index(),
expected,
check_exact=False,
check_dtype=False,
rtol=0.1,
)


def test_pca_score(penguins_pca_model: decomposition.PCA):
result = penguins_pca_model.score().to_pandas()
expected = pd.DataFrame(
Expand Down
70 changes: 58 additions & 12 deletions tests/system/small/ml/test_forecasting.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
]


def test_model_predict_default(time_series_arima_plus_model: forecasting.ARIMAPlus):
def test_arima_plus_predict_default(
time_series_arima_plus_model: forecasting.ARIMAPlus,
):
utc = pytz.utc
predictions = time_series_arima_plus_model.predict().to_pandas()
assert predictions.shape == (3, 8)
Expand Down Expand Up @@ -63,7 +65,7 @@ def test_model_predict_default(time_series_arima_plus_model: forecasting.ARIMAPl
)


def test_model_predict_params(time_series_arima_plus_model: forecasting.ARIMAPlus):
def test_arima_plus_predict_params(time_series_arima_plus_model: forecasting.ARIMAPlus):
utc = pytz.utc
predictions = time_series_arima_plus_model.predict(
horizon=4, confidence_level=0.9
Expand Down Expand Up @@ -94,7 +96,55 @@ def test_model_predict_params(time_series_arima_plus_model: forecasting.ARIMAPlu
)


def test_model_score(
def test_arima_plus_detect_anomalies(
time_series_arima_plus_model: forecasting.ARIMAPlus, new_time_series_df
):
anomalies = time_series_arima_plus_model.detect_anomalies(
new_time_series_df
).to_pandas()

expected = pd.DataFrame(
{
"is_anomaly": [False, False, False],
"lower_bound": [2349.301736, 2153.614829, 1849.040192],
"upper_bound": [3099.642833, 3033.12195, 2858.185876],
"anomaly_probability": [0.757824, 0.322559, 0.43011],
},
)
pd.testing.assert_frame_equal(
anomalies[["is_anomaly", "lower_bound", "upper_bound", "anomaly_probability"]],
expected,
rtol=0.1,
check_index_type=False,
check_dtype=False,
)


def test_arima_plus_detect_anomalies_params(
time_series_arima_plus_model: forecasting.ARIMAPlus, new_time_series_df
):
anomalies = time_series_arima_plus_model.detect_anomalies(
new_time_series_df, anomaly_prob_threshold=0.7
).to_pandas()

expected = pd.DataFrame(
{
"is_anomaly": [True, False, False],
"lower_bound": [2525.5363, 2360.1870, 2086.0609],
"upper_bound": [2923.408256, 2826.54981, 2621.165188],
"anomaly_probability": [0.757824, 0.322559, 0.43011],
},
)
pd.testing.assert_frame_equal(
anomalies[["is_anomaly", "lower_bound", "upper_bound", "anomaly_probability"]],
expected,
rtol=0.1,
check_index_type=False,
check_dtype=False,
)


def test_arima_plus_score(
time_series_arima_plus_model: forecasting.ARIMAPlus, new_time_series_df
):
result = time_series_arima_plus_model.score(
Expand All @@ -118,16 +168,14 @@ def test_model_score(
)


def test_model_summary(
time_series_arima_plus_model: forecasting.ARIMAPlus, new_time_series_df
):
def test_arima_plus_summary(time_series_arima_plus_model: forecasting.ARIMAPlus):
result = time_series_arima_plus_model.summary()
assert result.shape == (1, 12)
assert all(column in result.columns for column in ARIMA_EVALUATE_OUTPUT_COL)


def test_model_summary_show_all_candidates(
time_series_arima_plus_model: forecasting.ARIMAPlus, new_time_series_df
def test_arima_plus_summary_show_all_candidates(
time_series_arima_plus_model: forecasting.ARIMAPlus,
):
result = time_series_arima_plus_model.summary(
show_all_candidate_models=True,
Expand All @@ -136,7 +184,7 @@ def test_model_summary_show_all_candidates(
assert all(column in result.columns for column in ARIMA_EVALUATE_OUTPUT_COL)


def test_model_score_series(
def test_arima_plus_score_series(
time_series_arima_plus_model: forecasting.ARIMAPlus, new_time_series_df
):
result = time_series_arima_plus_model.score(
Expand All @@ -160,9 +208,7 @@ def test_model_score_series(
)


def test_model_summary_series(
time_series_arima_plus_model: forecasting.ARIMAPlus, new_time_series_df
):
def test_arima_plus_summary_series(time_series_arima_plus_model: forecasting.ARIMAPlus):
result = time_series_arima_plus_model.summary()
assert result.shape == (1, 12)
assert all(column in result.columns for column in ARIMA_EVALUATE_OUTPUT_COL)

0 comments on commit 6df28ed

Please sign in to comment.