From 5efe88953577fcb155f9e1c787e42d0e79841159 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Thu, 16 Mar 2017 16:16:56 +0200 Subject: [PATCH 1/5] Pyspark Imputer --- .../org/apache/spark/ml/feature/Imputer.scala | 6 +- python/pyspark/ml/feature.py | 151 ++++++++++++++++++ python/pyspark/ml/tests.py | 10 ++ 3 files changed, 164 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala index b1a802ee13fc4..5c829e01dab01 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala @@ -94,7 +94,7 @@ private[feature] trait ImputerParams extends Params with HasInputCols { * :: Experimental :: * Imputation estimator for completing missing values, either using the mean or the median * of the column in which the missing values are located. The input column should be of - * DoubleType or FloatType. Currently Imputer does not support categorical features yet + * DoubleType or FloatType. Currently Imputer does not support categorical features * (SPARK-15041) and possibly creates incorrect values for a categorical feature. * * Note that the mean/median value is computed after filtering out missing values. @@ -176,8 +176,8 @@ object Imputer extends DefaultParamsReadable[Imputer] { * :: Experimental :: * Model fitted by [[Imputer]]. * - * @param surrogateDF a DataFrame contains inputCols and their corresponding surrogates, which are - * used to replace the missing values in the input DataFrame. + * @param surrogateDF a DataFrame containing inputCols and their corresponding surrogates, + * which are used to replace the missing values in the input DataFrame. */ @Experimental class ImputerModel private[ml]( diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index 92f8549e9cb9e..5e5982f8ee8e6 100755 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -36,6 +36,7 @@ 'ElementwiseProduct', 'HashingTF', 'IDF', 'IDFModel', + 'Imputer', 'ImputerModel', 'IndexToString', 'MaxAbsScaler', 'MaxAbsScalerModel', 'MinHashLSH', 'MinHashLSHModel', @@ -870,6 +871,156 @@ def idf(self): return self._call_java("idf") +@inherit_doc +class Imputer(JavaEstimator, HasInputCols, JavaMLReadable, JavaMLWritable): + """ + Imputation estimator for completing missing values, either using the mean or the median + of the column in which the missing values are located. The input column should be of + DoubleType or FloatType. Currently Imputer does not support categorical features and + possibly creates incorrect values for a categorical feature. + + Note that the mean/median value is computed after filtering out missing values. + All Null values in the input column are treated as missing, and so are also imputed. For + computing median, :py:meth:`approxQuantile` is used with a relative error of 0.001. + + >>> df = spark.createDataFrame([(1.0, float("nan")), (2.0, float("nan")), (float("nan"), 3.0), + ... (4.0, 4.0), (5.0, 5.0)], ["a", "b"]) + >>> imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"]) + >>> model = imputer.fit(df) + >>> model.surrogateDF.show() + +---+---+ + | a| b| + +---+---+ + |3.0|4.0| + +---+---+ + ... + >>> model.transform(df).show() + +---+---+-----+-----+ + | a| b|out_a|out_b| + +---+---+-----+-----+ + |1.0|NaN| 1.0| 4.0| + |2.0|NaN| 2.0| 4.0| + |NaN|3.0| 3.0| 3.0| + ... + >>> imputer.setStrategy("median").setMissingValue(1.0).fit(df).transform(df).show() + +---+---+-----+-----+ + | a| b|out_a|out_b| + +---+---+-----+-----+ + |1.0|NaN| 4.0| NaN| + ... + >>> imputerPath = temp_path + "/imputer" + >>> imputer.save(imputerPath) + >>> loadedImputer = Imputer.load(imputerPath) + >>> loadedImputer.getStrategy() == imputer.getStrategy() + True + >>> loadedImputer.getMissingValue() + 1.0 + >>> modelPath = temp_path + "/imputer-model" + >>> model.save(modelPath) + >>> loadedModel = ImputerModel.load(modelPath) + >>> loadedModel.transform(df).head().out_a == model.transform(df).head().out_a + True + + .. versionadded:: 2.2.0 + """ + + outputCols = Param(Params._dummy(), "outputCols", + "output column names.", typeConverter=TypeConverters.toListString) + + strategy = Param(Params._dummy(), "strategy", + "strategy for imputation. If mean, then replace missing values using the mean " + "value of the feature. If median, then replace missing values using the " + "median value of the feature.", + typeConverter=TypeConverters.toString) + + missingValue = Param(Params._dummy(), "missingValue", + "The placeholder for the missing values. All occurrences of missingValue " + "will be imputed.", typeConverter=TypeConverters.toFloat) + + @keyword_only + def __init__(self, strategy="mean", missingValue=float("nan"), inputCols=None, outputCols=None): + """ + __init__(self, strategy="mean", missingValue=float("nan"), inputCols=None, outputCols=None): + """ + super(Imputer, self).__init__() + self._java_obj = self._new_java_obj("org.apache.spark.ml.feature.Imputer", self.uid) + self._setDefault(strategy="mean", missingValue=float("nan")) + kwargs = self._input_kwargs + self.setParams(**kwargs) + + @keyword_only + @since("2.2.0") + def setParams(self, strategy="mean", missingValue=float("nan"), inputCols=None, outputCols=None): + """ + setParams(self, strategy="mean", missingValue=float("nan"), inputCols=None, outputCols=None) + Sets params for this Imputer. + """ + kwargs = self._input_kwargs + return self._set(**kwargs) + + @since("2.2.0") + def setOutputCols(self, value): + """ + Sets the value of :py:attr:`outputCols`. + """ + return self._set(outputCols=value) + + @since("2.2.0") + def getOutputCols(self): + """ + Gets the value of :py:attr:`outputCols` or its default value. + """ + return self.getOrDefault(self.outputCols) + + @since("2.2.0") + def setStrategy(self, value): + """ + Sets the value of :py:attr:`strategy`. + """ + return self._set(strategy=value) + + @since("2.2.0") + def getStrategy(self): + """ + Gets the value of :py:attr:`strategy` or its default value. + """ + return self.getOrDefault(self.strategy) + + @since("2.2.0") + def setMissingValue(self, value): + """ + Sets the value of :py:attr:`missingValue`. + """ + return self._set(missingValue=value) + + @since("2.2.0") + def getMissingValue(self): + """ + Gets the value of :py:attr:`missingValue` or its default value. + """ + return self.getOrDefault(self.missingValue) + + def _create_model(self, java_model): + return ImputerModel(java_model) + + +class ImputerModel(JavaModel, JavaMLReadable, JavaMLWritable): + """ + Model fitted by :py:class:`Imputer`. + + .. versionadded:: 2.2.0 + """ + + @property + @since("2.2.0") + def surrogateDF(self): + """ + Returns a DataFrame containing inputCols and their corresponding surrogates, + which are used to replace the missing values in the input DataFrame. + """ + return self._call_java("surrogateDF") + + @inherit_doc class MaxAbsScaler(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable): """ diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index f052f5bb770c6..cc559db58720f 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -1273,6 +1273,7 @@ class DefaultValuesTests(PySparkTestCase): """ def check_params(self, py_stage): + import pyspark.ml.feature if not hasattr(py_stage, "_to_java"): return java_stage = py_stage._to_java() @@ -1292,6 +1293,15 @@ def check_params(self, py_stage): _java2py(self.sc, java_stage.clear(java_param).getOrDefault(java_param)) py_stage._clear(p) py_default = py_stage.getOrDefault(p) + if isinstance(py_stage, pyspark.ml.feature.Imputer) and p.name == "missingValue": + # SPARK-15040 - default value for Imputer param 'missingValue' is NaN, + # and NaN != NaN, so handle it specially here + import math + self.assertTrue(math.isnan(java_default) and math.isnan(py_default), + "Java default %s and python default %s are not both NaN for " + "param %s for Params %s" + % (str(java_default), str(py_default), p.name, str(py_stage))) + return self.assertEqual(java_default, py_default, "Java default %s != python default %s of param %s for Params %s" % (str(java_default), str(py_default), p.name, str(py_stage))) From 5e53e0562682a9a8a4259f21db5e2bd17372ce9d Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Thu, 16 Mar 2017 16:35:40 +0200 Subject: [PATCH 2/5] fix line length --- python/pyspark/ml/feature.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index 5e5982f8ee8e6..e877d3196b463 100755 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -938,9 +938,11 @@ class Imputer(JavaEstimator, HasInputCols, JavaMLReadable, JavaMLWritable): "will be imputed.", typeConverter=TypeConverters.toFloat) @keyword_only - def __init__(self, strategy="mean", missingValue=float("nan"), inputCols=None, outputCols=None): + def __init__(self, strategy="mean", missingValue=float("nan"), inputCols=None, + outputCols=None): """ - __init__(self, strategy="mean", missingValue=float("nan"), inputCols=None, outputCols=None): + __init__(self, strategy="mean", missingValue=float("nan"), inputCols=None, \ + outputCols=None): """ super(Imputer, self).__init__() self._java_obj = self._new_java_obj("org.apache.spark.ml.feature.Imputer", self.uid) @@ -950,9 +952,11 @@ def __init__(self, strategy="mean", missingValue=float("nan"), inputCols=None, o @keyword_only @since("2.2.0") - def setParams(self, strategy="mean", missingValue=float("nan"), inputCols=None, outputCols=None): + def setParams(self, strategy="mean", missingValue=float("nan"), inputCols=None, + outputCols=None): """ - setParams(self, strategy="mean", missingValue=float("nan"), inputCols=None, outputCols=None) + setParams(self, strategy="mean", missingValue=float("nan"), inputCols=None, \ + outputCols=None) Sets params for this Imputer. """ kwargs = self._input_kwargs From 325c9cf5e82f4c01fd5bd2ea93ba7ab24b2af2c2 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Thu, 16 Mar 2017 16:37:51 +0200 Subject: [PATCH 3/5] Add Experimental tags --- python/pyspark/ml/feature.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index e877d3196b463..eb2d37f337e6f 100755 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -874,6 +874,8 @@ def idf(self): @inherit_doc class Imputer(JavaEstimator, HasInputCols, JavaMLReadable, JavaMLWritable): """ + .. note:: Experimental + Imputation estimator for completing missing values, either using the mean or the median of the column in which the missing values are located. The input column should be of DoubleType or FloatType. Currently Imputer does not support categorical features and @@ -1010,6 +1012,8 @@ def _create_model(self, java_model): class ImputerModel(JavaModel, JavaMLReadable, JavaMLWritable): """ + .. note:: Experimental + Model fitted by :py:class:`Imputer`. .. versionadded:: 2.2.0 From 5c272b5b7fb0988a8344e56ffc4e124128112879 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Mon, 20 Mar 2017 13:54:08 +0400 Subject: [PATCH 4/5] Column -> columns in doc comments --- .../src/main/scala/org/apache/spark/ml/feature/Imputer.scala | 4 ++-- python/pyspark/ml/feature.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala index 5c829e01dab01..ec4c6ad75ee23 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala @@ -93,12 +93,12 @@ private[feature] trait ImputerParams extends Params with HasInputCols { /** * :: Experimental :: * Imputation estimator for completing missing values, either using the mean or the median - * of the column in which the missing values are located. The input column should be of + * of the columns in which the missing values are located. The input columns should be of * DoubleType or FloatType. Currently Imputer does not support categorical features * (SPARK-15041) and possibly creates incorrect values for a categorical feature. * * Note that the mean/median value is computed after filtering out missing values. - * All Null values in the input column are treated as missing, and so are also imputed. For + * All Null values in the input columns are treated as missing, and so are also imputed. For * computing median, DataFrameStatFunctions.approxQuantile is used with a relative error of 0.001. */ @Experimental diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index eb2d37f337e6f..41b6a906d29be 100755 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -877,12 +877,12 @@ class Imputer(JavaEstimator, HasInputCols, JavaMLReadable, JavaMLWritable): .. note:: Experimental Imputation estimator for completing missing values, either using the mean or the median - of the column in which the missing values are located. The input column should be of + of the columns in which the missing values are located. The input columns should be of DoubleType or FloatType. Currently Imputer does not support categorical features and possibly creates incorrect values for a categorical feature. Note that the mean/median value is computed after filtering out missing values. - All Null values in the input column are treated as missing, and so are also imputed. For + All Null values in the input columns are treated as missing, and so are also imputed. For computing median, :py:meth:`approxQuantile` is used with a relative error of 0.001. >>> df = spark.createDataFrame([(1.0, float("nan")), (2.0, float("nan")), (float("nan"), 3.0), From 7fd17dd43441b2c7212f964efd921e8c2d429a9b Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Tue, 21 Mar 2017 16:43:26 -0700 Subject: [PATCH 5/5] Fix doc for approxQuantile link --- python/pyspark/ml/feature.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index 41b6a906d29be..8d25f5b3a771a 100755 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -883,7 +883,8 @@ class Imputer(JavaEstimator, HasInputCols, JavaMLReadable, JavaMLWritable): Note that the mean/median value is computed after filtering out missing values. All Null values in the input columns are treated as missing, and so are also imputed. For - computing median, :py:meth:`approxQuantile` is used with a relative error of 0.001. + computing median, :py:meth:`pyspark.sql.DataFrame.approxQuantile` is used with a + relative error of `0.001`. >>> df = spark.createDataFrame([(1.0, float("nan")), (2.0, float("nan")), (float("nan"), 3.0), ... (4.0, 4.0), (5.0, 5.0)], ["a", "b"])