From 6df28ed704552ebec7869e1f2034614cb6407098 Mon Sep 17 00:00:00 2001 From: Garrett Wu <6505921+GarrettWu@users.noreply.github.com> Date: Tue, 12 Mar 2024 15:26:16 -0700 Subject: [PATCH] feat: add detect_anomalies to ml ARIMAPlus and KMeans models (#426) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/python-bigquery-dataframes/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes # 🦕 --- bigframes/ml/cluster.py | 28 +++++++++ bigframes/ml/decomposition.py | 2 +- bigframes/ml/forecasting.py | 30 +++++++++ tests/system/small/ml/test_cluster.py | 45 +++++++++++++ tests/system/small/ml/test_decomposition.py | 23 +++++++ tests/system/small/ml/test_forecasting.py | 70 +++++++++++++++++---- 6 files changed, 185 insertions(+), 13 deletions(-) diff --git a/bigframes/ml/cluster.py b/bigframes/ml/cluster.py index 360ab01453..c294d1f424 100644 --- a/bigframes/ml/cluster.py +++ b/bigframes/ml/cluster.py @@ -96,6 +96,34 @@ def predict( return self._bqml_model.predict(X) + def detect_anomalies( + self, X: Union[bpd.DataFrame, bpd.Series], *, contamination: float = 0.1 + ) -> bpd.DataFrame: + """Detect the anomaly data points of the input. + + Args: + X (bigframes.dataframe.DataFrame or bigframes.series.Series): + Series or a DataFrame to detect anomalies. + contamination (float, default 0.1): + Identifies the proportion of anomalies in the training dataset that are used to create the model. + The value must be in the range [0, 0.5]. + + Returns: + bigframes.dataframe.DataFrame: detected DataFrame.""" + if contamination < 0.0 or contamination > 0.5: + raise ValueError( + f"contamination must be [0.0, 0.5], but is {contamination}." + ) + + if not self._bqml_model: + raise RuntimeError("A model must be fitted before detect_anomalies") + + (X,) = utils.convert_to_dataframe(X) + + return self._bqml_model.detect_anomalies( + X, options={"contamination": contamination} + ) + def to_gbq(self, model_name: str, replace: bool = False) -> KMeans: """Save the model to BigQuery. diff --git a/bigframes/ml/decomposition.py b/bigframes/ml/decomposition.py index 2714664dce..9dc60be78f 100644 --- a/bigframes/ml/decomposition.py +++ b/bigframes/ml/decomposition.py @@ -111,7 +111,7 @@ def predict(self, X: Union[bpd.DataFrame, bpd.Series]) -> bpd.DataFrame: return self._bqml_model.predict(X) def detect_anomalies( - self, X: Union[bpd.DataFrame, bpd.Series], *, contamination=0.1 + self, X: Union[bpd.DataFrame, bpd.Series], *, contamination: float = 0.1 ) -> bpd.DataFrame: """Detect the anomaly data points of the input. diff --git a/bigframes/ml/forecasting.py b/bigframes/ml/forecasting.py index 0c33660475..18380328c7 100644 --- a/bigframes/ml/forecasting.py +++ b/bigframes/ml/forecasting.py @@ -119,6 +119,36 @@ def predict( options={"horizon": horizon, "confidence_level": confidence_level} ) + def detect_anomalies( + self, + X: Union[bpd.DataFrame, bpd.Series], + *, + anomaly_prob_threshold: float = 0.95, + ) -> bpd.DataFrame: + """Detect the anomaly data points of the input. + + Args: + X (bigframes.dataframe.DataFrame or bigframes.series.Series): + Series or a DataFrame to detect anomalies. + anomaly_prob_threshold (float, default 0.95): + Identifies the custom threshold to use for anomaly detection. The value must be in the range [0, 1), with a default value of 0.95. + + Returns: + bigframes.dataframe.DataFrame: detected DataFrame.""" + if anomaly_prob_threshold < 0.0 or anomaly_prob_threshold >= 1.0: + raise ValueError( + f"anomaly_prob_threshold must be [0.0, 1.0), but is {anomaly_prob_threshold}." + ) + + if not self._bqml_model: + raise RuntimeError("A model must be fitted before detect_anomalies") + + (X,) = utils.convert_to_dataframe(X) + + return self._bqml_model.detect_anomalies( + X, options={"anomaly_prob_threshold": anomaly_prob_threshold} + ) + def score( self, X: Union[bpd.DataFrame, bpd.Series], diff --git a/tests/system/small/ml/test_cluster.py b/tests/system/small/ml/test_cluster.py index a9fec0bbce..96066e5fbe 100644 --- a/tests/system/small/ml/test_cluster.py +++ b/tests/system/small/ml/test_cluster.py @@ -15,6 +15,7 @@ import pandas as pd from bigframes.ml import cluster +import bigframes.pandas as bpd from tests.system.utils import assert_pandas_df_equal _PD_NEW_PENGUINS = pd.DataFrame.from_dict( @@ -73,6 +74,50 @@ def test_kmeans_predict(session, penguins_kmeans_model: cluster.KMeans): assert_pandas_df_equal(result, expected, ignore_order=True) +def test_kmeans_detect_anomalies( + penguins_kmeans_model: cluster.KMeans, new_penguins_df: bpd.DataFrame +): + anomalies = penguins_kmeans_model.detect_anomalies(new_penguins_df).to_pandas() + expected = pd.DataFrame( + { + "is_anomaly": [False, False, False], + "normalized_distance": [1.082937, 0.77139, 0.478304], + }, + index=pd.Index([1633, 1672, 1690], name="tag_number", dtype="Int64"), + ) + + pd.testing.assert_frame_equal( + anomalies[["is_anomaly", "normalized_distance"]].sort_index(), + expected, + check_exact=False, + check_dtype=False, + rtol=0.1, + ) + + +def test_kmeans_detect_anomalies_params( + penguins_kmeans_model: cluster.KMeans, new_penguins_df: bpd.DataFrame +): + anomalies = penguins_kmeans_model.detect_anomalies( + new_penguins_df, contamination=0.4 + ).to_pandas() + expected = pd.DataFrame( + { + "is_anomaly": [True, False, False], + "normalized_distance": [1.082937, 0.77139, 0.478304], + }, + index=pd.Index([1633, 1672, 1690], name="tag_number", dtype="Int64"), + ) + + pd.testing.assert_frame_equal( + anomalies[["is_anomaly", "normalized_distance"]].sort_index(), + expected, + check_exact=False, + check_dtype=False, + rtol=0.1, + ) + + def test_kmeans_score(session, penguins_kmeans_model: cluster.KMeans): new_penguins = session.read_pandas(_PD_NEW_PENGUINS) result = penguins_kmeans_model.score(new_penguins).to_pandas() diff --git a/tests/system/small/ml/test_decomposition.py b/tests/system/small/ml/test_decomposition.py index 72fdc6d951..9eb9b25ea1 100644 --- a/tests/system/small/ml/test_decomposition.py +++ b/tests/system/small/ml/test_decomposition.py @@ -59,6 +59,29 @@ def test_pca_detect_anomalies( ) +def test_pca_detect_anomalies_params( + penguins_pca_model: decomposition.PCA, new_penguins_df: bpd.DataFrame +): + anomalies = penguins_pca_model.detect_anomalies( + new_penguins_df, contamination=0.2 + ).to_pandas() + expected = pd.DataFrame( + { + "is_anomaly": [False, True, True], + "mean_squared_error": [0.254188, 0.731243, 0.298889], + }, + index=pd.Index([1633, 1672, 1690], name="tag_number", dtype="Int64"), + ) + + pd.testing.assert_frame_equal( + anomalies[["is_anomaly", "mean_squared_error"]].sort_index(), + expected, + check_exact=False, + check_dtype=False, + rtol=0.1, + ) + + def test_pca_score(penguins_pca_model: decomposition.PCA): result = penguins_pca_model.score().to_pandas() expected = pd.DataFrame( diff --git a/tests/system/small/ml/test_forecasting.py b/tests/system/small/ml/test_forecasting.py index 4726d5ab21..7fef189550 100644 --- a/tests/system/small/ml/test_forecasting.py +++ b/tests/system/small/ml/test_forecasting.py @@ -35,7 +35,9 @@ ] -def test_model_predict_default(time_series_arima_plus_model: forecasting.ARIMAPlus): +def test_arima_plus_predict_default( + time_series_arima_plus_model: forecasting.ARIMAPlus, +): utc = pytz.utc predictions = time_series_arima_plus_model.predict().to_pandas() assert predictions.shape == (3, 8) @@ -63,7 +65,7 @@ def test_model_predict_default(time_series_arima_plus_model: forecasting.ARIMAPl ) -def test_model_predict_params(time_series_arima_plus_model: forecasting.ARIMAPlus): +def test_arima_plus_predict_params(time_series_arima_plus_model: forecasting.ARIMAPlus): utc = pytz.utc predictions = time_series_arima_plus_model.predict( horizon=4, confidence_level=0.9 @@ -94,7 +96,55 @@ def test_model_predict_params(time_series_arima_plus_model: forecasting.ARIMAPlu ) -def test_model_score( +def test_arima_plus_detect_anomalies( + time_series_arima_plus_model: forecasting.ARIMAPlus, new_time_series_df +): + anomalies = time_series_arima_plus_model.detect_anomalies( + new_time_series_df + ).to_pandas() + + expected = pd.DataFrame( + { + "is_anomaly": [False, False, False], + "lower_bound": [2349.301736, 2153.614829, 1849.040192], + "upper_bound": [3099.642833, 3033.12195, 2858.185876], + "anomaly_probability": [0.757824, 0.322559, 0.43011], + }, + ) + pd.testing.assert_frame_equal( + anomalies[["is_anomaly", "lower_bound", "upper_bound", "anomaly_probability"]], + expected, + rtol=0.1, + check_index_type=False, + check_dtype=False, + ) + + +def test_arima_plus_detect_anomalies_params( + time_series_arima_plus_model: forecasting.ARIMAPlus, new_time_series_df +): + anomalies = time_series_arima_plus_model.detect_anomalies( + new_time_series_df, anomaly_prob_threshold=0.7 + ).to_pandas() + + expected = pd.DataFrame( + { + "is_anomaly": [True, False, False], + "lower_bound": [2525.5363, 2360.1870, 2086.0609], + "upper_bound": [2923.408256, 2826.54981, 2621.165188], + "anomaly_probability": [0.757824, 0.322559, 0.43011], + }, + ) + pd.testing.assert_frame_equal( + anomalies[["is_anomaly", "lower_bound", "upper_bound", "anomaly_probability"]], + expected, + rtol=0.1, + check_index_type=False, + check_dtype=False, + ) + + +def test_arima_plus_score( time_series_arima_plus_model: forecasting.ARIMAPlus, new_time_series_df ): result = time_series_arima_plus_model.score( @@ -118,16 +168,14 @@ def test_model_score( ) -def test_model_summary( - time_series_arima_plus_model: forecasting.ARIMAPlus, new_time_series_df -): +def test_arima_plus_summary(time_series_arima_plus_model: forecasting.ARIMAPlus): result = time_series_arima_plus_model.summary() assert result.shape == (1, 12) assert all(column in result.columns for column in ARIMA_EVALUATE_OUTPUT_COL) -def test_model_summary_show_all_candidates( - time_series_arima_plus_model: forecasting.ARIMAPlus, new_time_series_df +def test_arima_plus_summary_show_all_candidates( + time_series_arima_plus_model: forecasting.ARIMAPlus, ): result = time_series_arima_plus_model.summary( show_all_candidate_models=True, @@ -136,7 +184,7 @@ def test_model_summary_show_all_candidates( assert all(column in result.columns for column in ARIMA_EVALUATE_OUTPUT_COL) -def test_model_score_series( +def test_arima_plus_score_series( time_series_arima_plus_model: forecasting.ARIMAPlus, new_time_series_df ): result = time_series_arima_plus_model.score( @@ -160,9 +208,7 @@ def test_model_score_series( ) -def test_model_summary_series( - time_series_arima_plus_model: forecasting.ARIMAPlus, new_time_series_df -): +def test_arima_plus_summary_series(time_series_arima_plus_model: forecasting.ARIMAPlus): result = time_series_arima_plus_model.summary() assert result.shape == (1, 12) assert all(column in result.columns for column in ARIMA_EVALUATE_OUTPUT_COL)