diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala index ad1010da5c104..03ebe0299f63f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala @@ -39,14 +39,16 @@ private[feature] trait ImputerParams extends Params with HasInputCol with HasInp * The imputation strategy. Currently only "mean" and "median" are supported. * If "mean", then replace missing values using the mean value of the feature. * If "median", then replace missing values using the approximate median value of the feature. + * If "mode", then replace missing using the most frequent value of the feature. * Default: mean * * @group param */ final val strategy: Param[String] = new Param(this, "strategy", s"strategy for imputation. " + s"If ${Imputer.mean}, then replace missing values using the mean value of the feature. " + - s"If ${Imputer.median}, then replace missing values using the median value of the feature.", - ParamValidators.inArray[String](Array(Imputer.mean, Imputer.median))) + s"If ${Imputer.median}, then replace missing values using the median value of the feature. " + + s"If ${Imputer.mode}, then replace missing values using the most frequent value of " + + s"the feature.", ParamValidators.inArray[String](Imputer.supportedStrategies)) /** @group getParam */ def getStrategy: String = $(strategy) @@ -104,7 +106,7 @@ private[feature] trait ImputerParams extends Params with HasInputCol with HasInp * For example, if the input column is IntegerType (1, 2, 4, null), * the output will be IntegerType (1, 2, 4, 2) after mean imputation. * - * Note that the mean/median value is computed after filtering out missing values. + * Note that the mean/median/mode value is computed after filtering out missing values. * All Null values in the input columns are treated as missing, and so are also imputed. For * computing median, DataFrameStatFunctions.approxQuantile is used with a relative error of 0.001. */ @@ -132,7 +134,7 @@ class Imputer @Since("2.2.0") (@Since("2.2.0") override val uid: String) def setOutputCols(value: Array[String]): this.type = set(outputCols, value) /** - * Imputation strategy. Available options are ["mean", "median"]. + * Imputation strategy. Available options are ["mean", "median", "mode"]. * @group setParam */ @Since("2.2.0") @@ -151,39 +153,42 @@ class Imputer @Since("2.2.0") (@Since("2.2.0") override val uid: String) val spark = dataset.sparkSession val (inputColumns, _) = getInOutCols() - val cols = inputColumns.map { inputCol => when(col(inputCol).equalTo($(missingValue)), null) .when(col(inputCol).isNaN, null) .otherwise(col(inputCol)) - .cast("double") + .cast(DoubleType) .as(inputCol) } + val numCols = cols.length val results = $(strategy) match { case Imputer.mean => // Function avg will ignore null automatically. // For a column only containing null, avg will return null. val row = dataset.select(cols.map(avg): _*).head() - Array.range(0, inputColumns.length).map { i => - if (row.isNullAt(i)) { - Double.NaN - } else { - row.getDouble(i) - } - } + Array.tabulate(numCols)(i => if (row.isNullAt(i)) Double.NaN else row.getDouble(i)) case Imputer.median => // Function approxQuantile will ignore null automatically. // For a column only containing null, approxQuantile will return an empty array. dataset.select(cols: _*).stat.approxQuantile(inputColumns, Array(0.5), $(relativeError)) - .map { array => - if (array.isEmpty) { - Double.NaN - } else { - array.head - } - } + .map(_.headOption.getOrElse(Double.NaN)) + + case Imputer.mode => + import spark.implicits._ + // If there is more than one mode, choose the smallest one to keep in line + // with sklearn.impute.SimpleImputer (using scipy.stats.mode). + val modes = dataset.select(cols: _*).flatMap { row => + // Ignore null. + Iterator.range(0, numCols) + .flatMap(i => if (row.isNullAt(i)) None else Some((i, row.getDouble(i)))) + }.toDF("index", "value") + .groupBy("index", "value").agg(negate(count(lit(0))).as("negative_count")) + .groupBy("index").agg(min(struct("negative_count", "value")).as("mode")) + .select("index", "mode.value") + .as[(Int, Double)].collect().toMap + Array.tabulate(numCols)(i => modes.getOrElse(i, Double.NaN)) } val emptyCols = inputColumns.zip(results).filter(_._2.isNaN).map(_._1) @@ -212,6 +217,10 @@ object Imputer extends DefaultParamsReadable[Imputer] { /** strategy names that Imputer currently supports. */ private[feature] val mean = "mean" private[feature] val median = "median" + private[feature] val mode = "mode" + + /* Set of strategies that Imputer supports */ + private[feature] val supportedStrategies = Array(mean, median, mode) @Since("2.2.0") override def load(path: String): Imputer = super.load(path) diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala index dfee2b4029c8b..30887f55638f9 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala @@ -28,13 +28,14 @@ import org.apache.spark.sql.types._ class ImputerSuite extends MLTest with DefaultReadWriteTest { test("Imputer for Double with default missing Value NaN") { - val df = spark.createDataFrame( Seq( - (0, 1.0, 4.0, 1.0, 1.0, 4.0, 4.0), - (1, 11.0, 12.0, 11.0, 11.0, 12.0, 12.0), - (2, 3.0, Double.NaN, 3.0, 3.0, 10.0, 12.0), - (3, Double.NaN, 14.0, 5.0, 3.0, 14.0, 14.0) - )).toDF("id", "value1", "value2", "expected_mean_value1", "expected_median_value1", - "expected_mean_value2", "expected_median_value2") + val df = spark.createDataFrame(Seq( + (0, 1.0, 4.0, 1.0, 1.0, 1.0, 4.0, 4.0, 4.0), + (1, 11.0, 12.0, 11.0, 11.0, 11.0, 12.0, 12.0, 12.0), + (2, 3.0, Double.NaN, 3.0, 3.0, 3.0, 10.0, 12.0, 4.0), + (3, Double.NaN, 14.0, 5.0, 3.0, 1.0, 14.0, 14.0, 14.0) + )).toDF("id", "value1", "value2", + "expected_mean_value1", "expected_median_value1", "expected_mode_value1", + "expected_mean_value2", "expected_median_value2", "expected_mode_value2") val imputer = new Imputer() .setInputCols(Array("value1", "value2")) .setOutputCols(Array("out1", "out2")) @@ -42,23 +43,25 @@ class ImputerSuite extends MLTest with DefaultReadWriteTest { } test("Single Column: Imputer for Double with default missing Value NaN") { - val df1 = spark.createDataFrame( Seq( - (0, 1.0, 1.0, 1.0), - (1, 11.0, 11.0, 11.0), - (2, 3.0, 3.0, 3.0), - (3, Double.NaN, 5.0, 3.0) - )).toDF("id", "value", "expected_mean_value", "expected_median_value") + val df1 = spark.createDataFrame(Seq( + (0, 1.0, 1.0, 1.0, 1.0), + (1, 11.0, 11.0, 11.0, 11.0), + (2, 3.0, 3.0, 3.0, 3.0), + (3, Double.NaN, 5.0, 3.0, 1.0) + )).toDF("id", "value", + "expected_mean_value", "expected_median_value", "expected_mode_value") val imputer1 = new Imputer() .setInputCol("value") .setOutputCol("out") ImputerSuite.iterateStrategyTest(false, imputer1, df1) - val df2 = spark.createDataFrame( Seq( - (0, 4.0, 4.0, 4.0), - (1, 12.0, 12.0, 12.0), - (2, Double.NaN, 10.0, 12.0), - (3, 14.0, 14.0, 14.0) - )).toDF("id", "value", "expected_mean_value", "expected_median_value") + val df2 = spark.createDataFrame(Seq( + (0, 4.0, 4.0, 4.0, 4.0), + (1, 12.0, 12.0, 12.0, 12.0), + (2, Double.NaN, 10.0, 12.0, 4.0), + (3, 14.0, 14.0, 14.0, 14.0) + )).toDF("id", "value", + "expected_mean_value", "expected_median_value", "expected_mode_value") val imputer2 = new Imputer() .setInputCol("value") .setOutputCol("out") @@ -66,12 +69,13 @@ class ImputerSuite extends MLTest with DefaultReadWriteTest { } test("Imputer should handle NaNs when computing surrogate value, if missingValue is not NaN") { - val df = spark.createDataFrame( Seq( - (0, 1.0, 1.0, 1.0), - (1, 3.0, 3.0, 3.0), - (2, Double.NaN, Double.NaN, Double.NaN), - (3, -1.0, 2.0, 1.0) - )).toDF("id", "value", "expected_mean_value", "expected_median_value") + val df = spark.createDataFrame(Seq( + (0, 1.0, 1.0, 1.0, 1.0), + (1, 3.0, 3.0, 3.0, 3.0), + (2, Double.NaN, Double.NaN, Double.NaN, Double.NaN), + (3, -1.0, 2.0, 1.0, 1.0) + )).toDF("id", "value", + "expected_mean_value", "expected_median_value", "expected_mode_value") val imputer = new Imputer().setInputCols(Array("value")).setOutputCols(Array("out")) .setMissingValue(-1.0) ImputerSuite.iterateStrategyTest(true, imputer, df) @@ -79,64 +83,69 @@ class ImputerSuite extends MLTest with DefaultReadWriteTest { test("Single Column: Imputer should handle NaNs when computing surrogate value," + " if missingValue is not NaN") { - val df = spark.createDataFrame( Seq( - (0, 1.0, 1.0, 1.0), - (1, 3.0, 3.0, 3.0), - (2, Double.NaN, Double.NaN, Double.NaN), - (3, -1.0, 2.0, 1.0) - )).toDF("id", "value", "expected_mean_value", "expected_median_value") + val df = spark.createDataFrame(Seq( + (0, 1.0, 1.0, 1.0, 1.0), + (1, 3.0, 3.0, 3.0, 3.0), + (2, Double.NaN, Double.NaN, Double.NaN, Double.NaN), + (3, -1.0, 2.0, 1.0, 1.0) + )).toDF("id", "value", + "expected_mean_value", "expected_median_value", "expected_mode_value") val imputer = new Imputer().setInputCol("value").setOutputCol("out") .setMissingValue(-1.0) ImputerSuite.iterateStrategyTest(false, imputer, df) } test("Imputer for Float with missing Value -1.0") { - val df = spark.createDataFrame( Seq( - (0, 1.0F, 1.0F, 1.0F), - (1, 3.0F, 3.0F, 3.0F), - (2, 10.0F, 10.0F, 10.0F), - (3, 10.0F, 10.0F, 10.0F), - (4, -1.0F, 6.0F, 3.0F) - )).toDF("id", "value", "expected_mean_value", "expected_median_value") + val df = spark.createDataFrame(Seq( + (0, 1.0F, 1.0F, 1.0F, 1.0F), + (1, 3.0F, 3.0F, 3.0F, 3.0F), + (2, 10.0F, 10.0F, 10.0F, 10.0F), + (3, 10.0F, 10.0F, 10.0F, 10.0F), + (4, -1.0F, 6.0F, 3.0F, 10.0F) + )).toDF("id", "value", + "expected_mean_value", "expected_median_value", "expected_mode_value") val imputer = new Imputer().setInputCols(Array("value")).setOutputCols(Array("out")) .setMissingValue(-1) ImputerSuite.iterateStrategyTest(true, imputer, df) } test("Single Column: Imputer for Float with missing Value -1.0") { - val df = spark.createDataFrame( Seq( - (0, 1.0F, 1.0F, 1.0F), - (1, 3.0F, 3.0F, 3.0F), - (2, 10.0F, 10.0F, 10.0F), - (3, 10.0F, 10.0F, 10.0F), - (4, -1.0F, 6.0F, 3.0F) - )).toDF("id", "value", "expected_mean_value", "expected_median_value") + val df = spark.createDataFrame(Seq( + (0, 1.0F, 1.0F, 1.0F, 1.0F), + (1, 3.0F, 3.0F, 3.0F, 3.0F), + (2, 10.0F, 10.0F, 10.0F, 10.0F), + (3, 10.0F, 10.0F, 10.0F, 10.0F), + (4, -1.0F, 6.0F, 3.0F, 10.0F) + )).toDF("id", "value", + "expected_mean_value", "expected_median_value", "expected_mode_value") val imputer = new Imputer().setInputCol("value").setOutputCol("out") .setMissingValue(-1) ImputerSuite.iterateStrategyTest(false, imputer, df) } test("Imputer should impute null as well as 'missingValue'") { - val rawDf = spark.createDataFrame( Seq( - (0, 4.0, 4.0, 4.0), - (1, 10.0, 10.0, 10.0), - (2, 10.0, 10.0, 10.0), - (3, Double.NaN, 8.0, 10.0), - (4, -1.0, 8.0, 10.0) - )).toDF("id", "rawValue", "expected_mean_value", "expected_median_value") + val rawDf = spark.createDataFrame(Seq( + (0, 4.0, 4.0, 4.0, 4.0), + (1, 10.0, 10.0, 10.0, 10.0), + (2, 10.0, 10.0, 10.0, 10.0), + (3, Double.NaN, 8.0, 10.0, 10.0), + (4, -1.0, 8.0, 10.0, 10.0) + )).toDF("id", "rawValue", + "expected_mean_value", "expected_median_value", "expected_mode_value") val df = rawDf.selectExpr("*", "IF(rawValue=-1.0, null, rawValue) as value") val imputer = new Imputer().setInputCols(Array("value")).setOutputCols(Array("out")) ImputerSuite.iterateStrategyTest(true, imputer, df) } test("Single Column: Imputer should impute null as well as 'missingValue'") { - val rawDf = spark.createDataFrame( Seq( - (0, 4.0, 4.0, 4.0), - (1, 10.0, 10.0, 10.0), - (2, 10.0, 10.0, 10.0), - (3, Double.NaN, 8.0, 10.0), - (4, -1.0, 8.0, 10.0) - )).toDF("id", "rawValue", "expected_mean_value", "expected_median_value") + val rawDf = spark.createDataFrame(Seq( + (0, 4.0, 4.0, 4.0, 4.0), + (1, 10.0, 10.0, 10.0, 10.0), + (2, 10.0, 10.0, 10.0, 10.0), + (3, Double.NaN, 8.0, 10.0, 10.0), + (4, -1.0, 8.0, 10.0, 10.0) + )).toDF("id", "rawValue", + "expected_mean_value", "expected_median_value", "expected_mode_value") val df = rawDf.selectExpr("*", "IF(rawValue=-1.0, null, rawValue) as value") val imputer = new Imputer().setInputCol("value").setOutputCol("out") ImputerSuite.iterateStrategyTest(false, imputer, df) @@ -187,7 +196,7 @@ class ImputerSuite extends MLTest with DefaultReadWriteTest { } test("Imputer throws exception when surrogate cannot be computed") { - val df = spark.createDataFrame( Seq( + val df = spark.createDataFrame(Seq( (0, Double.NaN, 1.0, 1.0), (1, Double.NaN, 3.0, 3.0), (2, Double.NaN, Double.NaN, Double.NaN) @@ -205,12 +214,13 @@ class ImputerSuite extends MLTest with DefaultReadWriteTest { } test("Single Column: Imputer throws exception when surrogate cannot be computed") { - val df = spark.createDataFrame( Seq( - (0, Double.NaN, 1.0, 1.0), - (1, Double.NaN, 3.0, 3.0), - (2, Double.NaN, Double.NaN, Double.NaN) - )).toDF("id", "value", "expected_mean_value", "expected_median_value") - Seq("mean", "median").foreach { strategy => + val df = spark.createDataFrame(Seq( + (0, Double.NaN, 1.0, 1.0, 1.0), + (1, Double.NaN, 3.0, 3.0, 3.0), + (2, Double.NaN, Double.NaN, Double.NaN, Double.NaN) + )).toDF("id", "value", + "expected_mean_value", "expected_median_value", "expected_mode_value") + Seq("mean", "median", "mode").foreach { strategy => val imputer = new Imputer().setInputCol("value").setOutputCol("out") .setStrategy(strategy) withClue("Imputer should fail all the values are invalid") { @@ -223,12 +233,12 @@ class ImputerSuite extends MLTest with DefaultReadWriteTest { } test("Imputer input & output column validation") { - val df = spark.createDataFrame( Seq( + val df = spark.createDataFrame(Seq( (0, 1.0, 1.0, 1.0), (1, Double.NaN, 3.0, 3.0), (2, Double.NaN, Double.NaN, Double.NaN) )).toDF("id", "value1", "value2", "value3") - Seq("mean", "median").foreach { strategy => + Seq("mean", "median", "mode").foreach { strategy => withClue("Imputer should fail if inputCols and outputCols are different length") { val e: IllegalArgumentException = intercept[IllegalArgumentException] { val imputer = new Imputer().setStrategy(strategy) @@ -306,13 +316,13 @@ class ImputerSuite extends MLTest with DefaultReadWriteTest { } test("Imputer for IntegerType with default missing value null") { - - val df = spark.createDataFrame(Seq[(Integer, Integer, Integer)]( - (1, 1, 1), - (11, 11, 11), - (3, 3, 3), - (null, 5, 3) - )).toDF("value1", "expected_mean_value1", "expected_median_value1") + val df = spark.createDataFrame(Seq[(Integer, Integer, Integer, Integer)]( + (1, 1, 1, 1), + (11, 11, 11, 11), + (3, 3, 3, 3), + (null, 5, 3, 1) + )).toDF("value1", + "expected_mean_value1", "expected_median_value1", "expected_mode_value1") val imputer = new Imputer() .setInputCols(Array("value1")) @@ -327,12 +337,13 @@ class ImputerSuite extends MLTest with DefaultReadWriteTest { } test("Single Column Imputer for IntegerType with default missing value null") { - val df = spark.createDataFrame(Seq[(Integer, Integer, Integer)]( - (1, 1, 1), - (11, 11, 11), - (3, 3, 3), - (null, 5, 3) - )).toDF("value", "expected_mean_value", "expected_median_value") + val df = spark.createDataFrame(Seq[(Integer, Integer, Integer, Integer)]( + (1, 1, 1, 1), + (11, 11, 11, 11), + (3, 3, 3, 3), + (null, 5, 3, 1) + )).toDF("value", + "expected_mean_value", "expected_median_value", "expected_mode_value") val imputer = new Imputer() .setInputCol("value") @@ -347,13 +358,13 @@ class ImputerSuite extends MLTest with DefaultReadWriteTest { } test("Imputer for IntegerType with missing value -1") { - - val df = spark.createDataFrame(Seq[(Integer, Integer, Integer)]( - (1, 1, 1), - (11, 11, 11), - (3, 3, 3), - (-1, 5, 3) - )).toDF("value1", "expected_mean_value1", "expected_median_value1") + val df = spark.createDataFrame(Seq[(Integer, Integer, Integer, Integer)]( + (1, 1, 1, 1), + (11, 11, 11, 11), + (3, 3, 3, 3), + (-1, 5, 3, 1) + )).toDF("value1", + "expected_mean_value1", "expected_median_value1", "expected_mode_value1") val imputer = new Imputer() .setInputCols(Array("value1")) @@ -369,12 +380,13 @@ class ImputerSuite extends MLTest with DefaultReadWriteTest { } test("Single Column: Imputer for IntegerType with missing value -1") { - val df = spark.createDataFrame(Seq[(Integer, Integer, Integer)]( - (1, 1, 1), - (11, 11, 11), - (3, 3, 3), - (-1, 5, 3) - )).toDF("value", "expected_mean_value", "expected_median_value") + val df = spark.createDataFrame(Seq[(Integer, Integer, Integer, Integer)]( + (1, 1, 1, 1), + (11, 11, 11, 11), + (3, 3, 3, 3), + (-1, 5, 3, 1) + )).toDF("value", + "expected_mean_value", "expected_median_value", "expected_mode_value") val imputer = new Imputer() .setInputCol("value") @@ -402,13 +414,13 @@ class ImputerSuite extends MLTest with DefaultReadWriteTest { } test("Compare single/multiple column(s) Imputer in pipeline") { - val df = spark.createDataFrame( Seq( + val df = spark.createDataFrame(Seq( (0, 1.0, 4.0), (1, 11.0, 12.0), (2, 3.0, Double.NaN), (3, Double.NaN, 14.0) )).toDF("id", "value1", "value2") - Seq("mean", "median").foreach { strategy => + Seq("mean", "median", "mode").foreach { strategy => val multiColsImputer = new Imputer() .setInputCols(Array("value1", "value2")) .setOutputCols(Array("result1", "result2")) @@ -450,11 +462,12 @@ class ImputerSuite extends MLTest with DefaultReadWriteTest { object ImputerSuite { /** - * Imputation strategy. Available options are ["mean", "median"]. - * @param df DataFrame with columns "id", "value", "expected_mean", "expected_median" + * Imputation strategy. Available options are ["mean", "median", "mode"]. + * @param df DataFrame with columns "id", "value", "expected_mean", "expected_median", + * "expected_mode". */ def iterateStrategyTest(isMultiCol: Boolean, imputer: Imputer, df: DataFrame): Unit = { - Seq("mean", "median").foreach { strategy => + Seq("mean", "median", "mode").foreach { strategy => imputer.setStrategy(strategy) val model = imputer.fit(df) val resultDF = model.transform(df) diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index 4d898bd5fffa8..82b9a6db1eb92 100755 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -1507,7 +1507,8 @@ class _ImputerParams(HasInputCol, HasInputCols, HasOutputCol, HasOutputCols, Has strategy = Param(Params._dummy(), "strategy", "strategy for imputation. If mean, then replace missing values using the mean " "value of the feature. If median, then replace missing values using the " - "median value of the feature.", + "median value of the feature. If mode, then replace missing using the most " + "frequent value of the feature.", typeConverter=TypeConverters.toString) missingValue = Param(Params._dummy(), "missingValue", @@ -1541,7 +1542,7 @@ class Imputer(JavaEstimator, _ImputerParams, JavaMLReadable, JavaMLWritable): numeric type. Currently Imputer does not support categorical features and possibly creates incorrect values for a categorical feature. - Note that the mean/median value is computed after filtering out missing values. + Note that the mean/median/mode value is computed after filtering out missing values. All Null values in the input columns are treated as missing, and so are also imputed. For computing median, :py:meth:`pyspark.sql.DataFrame.approxQuantile` is used with a relative error of `0.001`.