diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala index 9137d5fbb10bf..5ab6c2dde667a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala @@ -90,7 +90,7 @@ class OneHotEncoder @Since("1.4.0") (@Since("1.4.0") override val uid: String) e s"Output column $outputColName already exists.") val outputField = OneHotEncoderCommon.transformOutputColumnSchema( - schema(inputColName), $(dropLast), outputColName) + schema(inputColName), outputColName, $(dropLast)) val outputFields = inputFields :+ outputField StructType(outputFields) } @@ -106,7 +106,7 @@ class OneHotEncoder @Since("1.4.0") (@Since("1.4.0") override val uid: String) e val outputAttrGroup = if (outputAttrGroupFromSchema.size < 0) { OneHotEncoderCommon.getOutputAttrGroupFromData( - dataset, $(dropLast), Seq(inputColName), Seq(outputColName))(0) + dataset, Seq(inputColName), Seq(outputColName), $(dropLast))(0) } else { outputAttrGroupFromSchema } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoderEstimator.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoderEstimator.scala index b915fbb2f49e3..d489f0a12f96e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoderEstimator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoderEstimator.scala @@ -65,7 +65,8 @@ private[ml] trait OneHotEncoderBase extends Params with HasHandleInvalid @Since("2.3.0") def getDropLast: Boolean = $(dropLast) - protected def validateAndTransformSchema(schema: StructType): StructType = { + protected def validateAndTransformSchema( + schema: StructType, dropLast: Boolean, keepInvalid: Boolean): StructType = { val inputColNames = $(inputCols) val outputColNames = $(outputCols) val existingFields = schema.fields @@ -74,22 +75,19 @@ private[ml] trait OneHotEncoderBase extends Params with HasHandleInvalid s"The number of input columns ${inputColNames.length} must be the same as the number of " + s"output columns ${outputColNames.length}.") - inputColNames.zip(outputColNames).map { case (inputColName, outputColName) => - require(schema(inputColName).dataType.isInstanceOf[NumericType], - s"Input column must be of type NumericType but got ${schema(inputColName).dataType}") - require(!existingFields.exists(_.name == outputColName), - s"Output column $outputColName already exists.") - } + // Input columns must be NumericType. + inputColNames.foreach(SchemaUtils.checkNumericType(schema, _)) // Prepares output columns with proper attributes by examining input columns. val inputFields = $(inputCols).map(schema(_)) - val keepInvalid = $(handleInvalid) == OneHotEncoderEstimator.KEEP_INVALID val outputFields = inputFields.zip(outputColNames).map { case (inputField, outputColName) => OneHotEncoderCommon.transformOutputColumnSchema( - inputField, $(dropLast), outputColName, keepInvalid) + inputField, outputColName, dropLast, keepInvalid) + } + outputFields.foldLeft(schema) { case (newSchema, outputField) => + SchemaUtils.appendColumn(newSchema, outputField) } - StructType(schema.fields ++ outputFields) } } @@ -109,6 +107,9 @@ private[ml] trait OneHotEncoderBase extends Params with HasHandleInvalid * added as last category. So when `dropLast` is true, invalid values are encoded as all-zeros * vector. * + * @note When encoding multi-column by using `inputCols` and `outputCols` params, input/output cols + * come in pairs, specified by the order in the arrays, and each pair is treated independently. + * * @see `StringIndexer` for converting categorical values into category indices */ @Since("2.3.0") @@ -136,7 +137,9 @@ class OneHotEncoderEstimator @Since("2.3.0") (@Since("2.3.0") override val uid: @Since("2.3.0") override def transformSchema(schema: StructType): StructType = { - validateAndTransformSchema(schema) + // When fitting data, we want the the plain number of categories without `handleInvalid` and + // `dropLast` taken into account. + validateAndTransformSchema(schema, dropLast = false, keepInvalid = false) } @Since("2.3.0") @@ -160,9 +163,11 @@ class OneHotEncoderEstimator @Since("2.3.0") (@Since("2.3.0") override val uid: if (columnToScanIndices.length > 0) { val inputColNames = columnToScanIndices.map($(inputCols)(_)) val outputColNames = columnToScanIndices.map($(outputCols)(_)) - val keepInvalid = $(handleInvalid) == OneHotEncoderEstimator.KEEP_INVALID + + // When fitting data, we want the plain number of categories without `handleInvalid` and + // `dropLast` taken into account. val attrGroups = OneHotEncoderCommon.getOutputAttrGroupFromData( - dataset, $(dropLast), inputColNames, outputColNames, keepInvalid) + dataset, inputColNames, outputColNames, dropLast = false) attrGroups.zip(columnToScanIndices).foreach { case (attrGroup, idx) => categorySizes(idx) = attrGroup.size } @@ -195,6 +200,26 @@ class OneHotEncoderModel private[ml] ( import OneHotEncoderModel._ + // The actual number of categories varies due to different setting of `dropLast` and + // `handleInvalid`. + private def configedCategorySizes: Array[Int] = { + val dropLast = getDropLast + val keepInvalid = getHandleInvalid == OneHotEncoderEstimator.KEEP_INVALID + + if (!dropLast && keepInvalid) { + // When `handleInvalid` is "keep", an extra category is added as last category + // for invalid data. + categorySizes.map(_ + 1) + } else if (dropLast && !keepInvalid) { + // When `dropLast` is true, the last category is removed. + categorySizes.map(_ - 1) + } else { + // When `dropLast` is true and `handleInvalid` is "keep", the extra category for invalid + // data is removed. Thus, it is the same as the plain number of categories. + categorySizes + } + } + private def encoder: UserDefinedFunction = { val oneValue = Array(1.0) val emptyValues = Array.empty[Double] @@ -205,21 +230,29 @@ class OneHotEncoderModel private[ml] ( udf { (label: Double, size: Int) => val numCategory = if (!dropLast && keepInvalid) { - // When `handleInvalid` is 'keep' and `dropLast` is false, the last category is + // When `dropLast` is false and `handleInvalid` is "keep", the last category is // for invalid data. size - 1 } else { size } - if (label < numCategory) { + if (label < 0) { + throw new SparkException(s"Negative value: $label. Input can't be negative.") + } else if (label < numCategory) { Vectors.sparse(size, Array(label.toInt), oneValue) } else if (label == numCategory && dropLast && !keepInvalid) { + // When `dropLast` is true and `handleInvalid` is not "keep", + // the last category is removed. Vectors.sparse(size, emptyIndices, emptyValues) } else if (dropLast && keepInvalid) { + // When `dropLast` is true and `handleInvalid` is "keep", + // invalid data is encoded to the removed last category. Vectors.sparse(size, emptyIndices, emptyValues) } else if (keepInvalid) { - Vectors.sparse(size, Array(size - 1), oneValue) + // When `dropLast` is false and `handleInvalid` is "keep", + // invalid data is encoded to the last category. + Vectors.sparse(size, Array(numCategory), oneValue) } else { assert(handleInvalid == OneHotEncoderEstimator.ERROR_INVALID) throw new SparkException(s"Unseen value: $label. To handle unseen values, " + @@ -253,26 +286,29 @@ class OneHotEncoderModel private[ml] ( s"The number of input columns ${inputColNames.length} must be the same as the number of " + s"features ${categorySizes.length} during fitting.") - val transformedSchema = validateAndTransformSchema(schema) + val keepInvalid = $(handleInvalid) == OneHotEncoderEstimator.KEEP_INVALID + val transformedSchema = validateAndTransformSchema(schema, dropLast = $(dropLast), + keepInvalid = keepInvalid) verifyNumOfValues(transformedSchema) } /** * If the metadata of input columns also specifies the number of categories, we need to - * compare with expected category number obtained during fitting. Mismatched numbers will - * cause exception. + * compare with expected category number with `handleInvalid` and `dropLast` taken into + * account. Mismatched numbers will cause exception. */ private def verifyNumOfValues(schema: StructType): StructType = { $(outputCols).zipWithIndex.foreach { case (outputColName, idx) => val inputColName = $(inputCols)(idx) val attrGroup = AttributeGroup.fromStructField(schema(outputColName)) - // If the input metadata specifies number of category, - // compare with expected category number. + // If the input metadata specifies number of category for output column, + // comparing with expected category number with `handleInvalid` and + // `dropLast` taken into account. if (attrGroup.attributes.nonEmpty) { - require(attrGroup.size == categorySizes(idx), "OneHotEncoderModel expected " + - s"${categorySizes(idx)} categorical values for input column ${inputColName}, but " + - s"the input column had metadata specifying ${attrGroup.size} values.") + require(attrGroup.size == configedCategorySizes(idx), "OneHotEncoderModel expected " + + s"${configedCategorySizes(idx)} categorical values for input column ${inputColName}, " + + s"but the input column had metadata specifying ${attrGroup.size} values.") } } schema @@ -281,6 +317,7 @@ class OneHotEncoderModel private[ml] ( @Since("2.3.0") override def transform(dataset: Dataset[_]): DataFrame = { val transformedSchema = transformSchema(dataset.schema, logging = true) + val keepInvalid = $(handleInvalid) == OneHotEncoderEstimator.KEEP_INVALID val encodedColumns = (0 until $(inputCols).length).map { idx => val inputColName = $(inputCols)(idx) @@ -290,13 +327,13 @@ class OneHotEncoderModel private[ml] ( AttributeGroup.fromStructField(transformedSchema(outputColName)) val metadata = if (outputAttrGroupFromSchema.size < 0) { - OneHotEncoderCommon.createAttrGroupForAttrNames(outputColName, false, - categorySizes(idx)).toMetadata() + OneHotEncoderCommon.createAttrGroupForAttrNames(outputColName, + categorySizes(idx), $(dropLast), keepInvalid).toMetadata() } else { outputAttrGroupFromSchema.toMetadata() } - encoder(col(inputColName).cast(DoubleType), lit(categorySizes(idx))) + encoder(col(inputColName).cast(DoubleType), lit(configedCategorySizes(idx))) .as(outputColName, metadata) } dataset.withColumns($(outputCols), encodedColumns) @@ -376,7 +413,7 @@ private[feature] object OneHotEncoderCommon { } case _: NumericAttribute => throw new RuntimeException( - s"The input column ${inputCol.name} cannot be numeric.") + s"The input column ${inputCol.name} cannot be continuous-value.") case _ => None // optimistic about unknown attributes } @@ -401,8 +438,8 @@ private[feature] object OneHotEncoderCommon { */ def transformOutputColumnSchema( inputCol: StructField, - dropLast: Boolean, outputColName: String, + dropLast: Boolean, keepInvalid: Boolean = false): StructField = { val outputAttrNames = genOutputAttrNames(inputCol) val filteredOutputAttrNames = outputAttrNames.map { names => @@ -426,10 +463,9 @@ private[feature] object OneHotEncoderCommon { */ def getOutputAttrGroupFromData( dataset: Dataset[_], - dropLast: Boolean, inputColNames: Seq[String], outputColNames: Seq[String], - handleInvalid: Boolean = false): Seq[AttributeGroup] = { + dropLast: Boolean): Seq[AttributeGroup] = { // The RDD approach has advantage of early-stop if any values are invalid. It seems that // DataFrame ops don't have equivalent functions. val columns = inputColNames.map { inputColName => @@ -441,31 +477,35 @@ private[feature] object OneHotEncoderCommon { (0 until numOfColumns).map(idx => row.getDouble(idx)).toArray }.treeAggregate(new Array[Double](numOfColumns))( (maxValues, curValues) => { - (0 until numOfColumns).map { idx => + (0 until numOfColumns).foreach { idx => val x = curValues(idx) assert(x <= Int.MaxValue, s"OneHotEncoder only supports up to ${Int.MaxValue} indices, but got $x.") assert(x >= 0.0 && x == x.toInt, s"Values from column ${inputColNames(idx)} must be indices, but got $x.") - math.max(maxValues(idx), x) - }.toArray + maxValues(idx) = math.max(maxValues(idx), x) + } + maxValues }, (m0, m1) => { - (0 until numOfColumns).map(idx => math.max(m0(idx), m1(idx))).toArray + (0 until numOfColumns).foreach { idx => + m0(idx) = math.max(m0(idx), m1(idx)) + } + m0 } ).map(_.toInt + 1) outputColNames.zip(numAttrsArray).map { case (outputColName, numAttrs) => - createAttrGroupForAttrNames(outputColName, dropLast, numAttrs, handleInvalid) + createAttrGroupForAttrNames(outputColName, numAttrs, dropLast, keepInvalid = false) } } /** Creates an `AttributeGroup` with the required number of `BinaryAttribute`. */ def createAttrGroupForAttrNames( outputColName: String, - dropLast: Boolean, numAttrs: Int, - keepInvalid: Boolean = false): AttributeGroup = { + dropLast: Boolean, + keepInvalid: Boolean): AttributeGroup = { val outputAttrNames = Array.tabulate(numAttrs)(_.toString) val filtered = if (dropLast && !keepInvalid) { outputAttrNames.dropRight(1) diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderEstimatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderEstimatorSuite.scala index 56be70c9941b5..9b9dc435b54f8 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderEstimatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderEstimatorSuite.scala @@ -19,11 +19,11 @@ package org.apache.spark.ml.feature import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.ml.attribute.{AttributeGroup, BinaryAttribute, NominalAttribute} -import org.apache.spark.ml.linalg.Vector +import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT} import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.util.DefaultReadWriteTest import org.apache.spark.mllib.util.MLlibTestSparkContext -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.functions.col import org.apache.spark.sql.types._ @@ -32,72 +32,67 @@ class OneHotEncoderEstimatorSuite import testImplicits._ - def stringIndexed(): DataFrame = stringIndexedMultipleCols().select("id", "label", "labelIndex") - - def stringIndexedMultipleCols(): DataFrame = { - val data = Seq( - (0, "a", "A"), - (1, "b", "B"), - (2, "c", "D"), - (3, "a", "A"), - (4, "a", "B"), - (5, "c", "C")) - val df = data.toDF("id", "label", "label2") - val indexer = new StringIndexer() - .setInputCol("label") - .setOutputCol("labelIndex") - .fit(df) - val df2 = indexer.transform(df) - val indexer2 = new StringIndexer() - .setInputCol("label2") - .setOutputCol("labelIndex2") - .fit(df2) - indexer2.transform(df2) - } - test("params") { ParamsSuite.checkParams(new OneHotEncoderEstimator) } test("OneHotEncoderEstimator dropLast = false") { - val transformed = stringIndexed() + val data = Seq( + Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))), + Row(1.0, Vectors.sparse(3, Seq((1, 1.0)))), + Row(2.0, Vectors.sparse(3, Seq((2, 1.0)))), + Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))), + Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))), + Row(2.0, Vectors.sparse(3, Seq((2, 1.0))))) + + val schema = StructType(Array( + StructField("input", DoubleType), + StructField("expected", new VectorUDT))) + + val df = spark.createDataFrame(sc.parallelize(data), schema) + val encoder = new OneHotEncoderEstimator() - .setInputCols(Array("labelIndex")) - .setOutputCols(Array("labelVec")) + .setInputCols(Array("input")) + .setOutputCols(Array("output")) assert(encoder.getDropLast === true) encoder.setDropLast(false) assert(encoder.getDropLast === false) - val model = encoder.fit(transformed) - val encoded = model.transform(transformed) - - val output = encoded.select("id", "labelVec").rdd.map { r => - val vec = r.getAs[Vector](1) - (r.getInt(0), vec(0), vec(1), vec(2)) - }.collect().toSet - // a -> 0, b -> 2, c -> 1 - val expected = Set((0, 1.0, 0.0, 0.0), (1, 0.0, 0.0, 1.0), (2, 0.0, 1.0, 0.0), - (3, 1.0, 0.0, 0.0), (4, 1.0, 0.0, 0.0), (5, 0.0, 1.0, 0.0)) - assert(output === expected) + val model = encoder.fit(df) + val encoded = model.transform(df) + encoded.select("output", "expected").rdd.map { r => + (r.getAs[Vector](0), r.getAs[Vector](1)) + }.collect().foreach { case (vec1, vec2) => + assert(vec1 === vec2) + } } test("OneHotEncoderEstimator dropLast = true") { - val transformed = stringIndexed() + val data = Seq( + Row(0.0, Vectors.sparse(2, Seq((0, 1.0)))), + Row(1.0, Vectors.sparse(2, Seq((1, 1.0)))), + Row(2.0, Vectors.sparse(2, Seq())), + Row(0.0, Vectors.sparse(2, Seq((0, 1.0)))), + Row(0.0, Vectors.sparse(2, Seq((0, 1.0)))), + Row(2.0, Vectors.sparse(2, Seq()))) + + val schema = StructType(Array( + StructField("input", DoubleType), + StructField("expected", new VectorUDT))) + + val df = spark.createDataFrame(sc.parallelize(data), schema) + val encoder = new OneHotEncoderEstimator() - .setInputCols(Array("labelIndex")) - .setOutputCols(Array("labelVec")) - - val model = encoder.fit(transformed) - val encoded = model.transform(transformed) - - val output = encoded.select("id", "labelVec").rdd.map { r => - val vec = r.getAs[Vector](1) - (r.getInt(0), vec(0), vec(1)) - }.collect().toSet - // a -> 0, b -> 2, c -> 1 - val expected = Set((0, 1.0, 0.0), (1, 0.0, 0.0), (2, 0.0, 1.0), - (3, 1.0, 0.0), (4, 1.0, 0.0), (5, 0.0, 1.0)) - assert(output === expected) + .setInputCols(Array("input")) + .setOutputCols(Array("output")) + + val model = encoder.fit(df) + val encoded = model.transform(df) + encoded.select("output", "expected").rdd.map { r => + (r.getAs[Vector](0), r.getAs[Vector](1)) + }.collect().foreach { case (vec1, vec2) => + assert(vec1 === vec2) + } } test("input column with ML attribute") { @@ -142,95 +137,109 @@ class OneHotEncoderEstimatorSuite } test("OneHotEncoderEstimator with varying types") { - val df = stringIndexed() + val data = Seq( + Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))), + Row(1.0, Vectors.sparse(3, Seq((1, 1.0)))), + Row(2.0, Vectors.sparse(3, Seq((2, 1.0)))), + Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))), + Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))), + Row(2.0, Vectors.sparse(3, Seq((2, 1.0))))) + + val schema = StructType(Array( + StructField("input", DoubleType), + StructField("expected", new VectorUDT))) + + val df = spark.createDataFrame(sc.parallelize(data), schema) + val dfWithTypes = df - .withColumn("shortLabel", df("labelIndex").cast(ShortType)) - .withColumn("longLabel", df("labelIndex").cast(LongType)) - .withColumn("intLabel", df("labelIndex").cast(IntegerType)) - .withColumn("floatLabel", df("labelIndex").cast(FloatType)) - .withColumn("decimalLabel", df("labelIndex").cast(DecimalType(10, 0))) - val cols = Array("labelIndex", "shortLabel", "longLabel", "intLabel", - "floatLabel", "decimalLabel") + .withColumn("shortInput", df("input").cast(ShortType)) + .withColumn("longInput", df("input").cast(LongType)) + .withColumn("intInput", df("input").cast(IntegerType)) + .withColumn("floatInput", df("input").cast(FloatType)) + .withColumn("decimalInput", df("input").cast(DecimalType(10, 0))) + + val cols = Array("input", "shortInput", "longInput", "intInput", + "floatInput", "decimalInput") for (col <- cols) { val encoder = new OneHotEncoderEstimator() .setInputCols(Array(col)) - .setOutputCols(Array("labelVec")) + .setOutputCols(Array("output")) .setDropLast(false) + val model = encoder.fit(dfWithTypes) val encoded = model.transform(dfWithTypes) - val output = encoded.select("id", "labelVec").rdd.map { r => - val vec = r.getAs[Vector](1) - (r.getInt(0), vec(0), vec(1), vec(2)) - }.collect().toSet - // a -> 0, b -> 2, c -> 1 - val expected = Set((0, 1.0, 0.0, 0.0), (1, 0.0, 0.0, 1.0), (2, 0.0, 1.0, 0.0), - (3, 1.0, 0.0, 0.0), (4, 1.0, 0.0, 0.0), (5, 0.0, 1.0, 0.0)) - assert(output === expected) + encoded.select("output", "expected").rdd.map { r => + (r.getAs[Vector](0), r.getAs[Vector](1)) + }.collect().foreach { case (vec1, vec2) => + assert(vec1 === vec2) + } } } test("OneHotEncoderEstimator: encoding multiple columns and dropLast = false") { - val transformed = stringIndexedMultipleCols() + val data = Seq( + Row(0.0, Vectors.sparse(3, Seq((0, 1.0))), 2.0, Vectors.sparse(4, Seq((2, 1.0)))), + Row(1.0, Vectors.sparse(3, Seq((1, 1.0))), 3.0, Vectors.sparse(4, Seq((3, 1.0)))), + Row(2.0, Vectors.sparse(3, Seq((2, 1.0))), 0.0, Vectors.sparse(4, Seq((0, 1.0)))), + Row(0.0, Vectors.sparse(3, Seq((0, 1.0))), 1.0, Vectors.sparse(4, Seq((1, 1.0)))), + Row(0.0, Vectors.sparse(3, Seq((0, 1.0))), 0.0, Vectors.sparse(4, Seq((0, 1.0)))), + Row(2.0, Vectors.sparse(3, Seq((2, 1.0))), 2.0, Vectors.sparse(4, Seq((2, 1.0))))) + + val schema = StructType(Array( + StructField("input1", DoubleType), + StructField("expected1", new VectorUDT), + StructField("input2", DoubleType), + StructField("expected2", new VectorUDT))) + + val df = spark.createDataFrame(sc.parallelize(data), schema) + val encoder = new OneHotEncoderEstimator() - .setInputCols(Array("labelIndex", "labelIndex2")) - .setOutputCols(Array("labelVec", "labelVec2")) + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("output1", "output2")) assert(encoder.getDropLast === true) encoder.setDropLast(false) assert(encoder.getDropLast === false) - val model = encoder.fit(transformed) - val encoded = model.transform(transformed) - - // Verify 1st column. - val output = encoded.select("id", "labelVec").rdd.map { r => - val vec = r.getAs[Vector](1) - (r.getInt(0), vec(0), vec(1), vec(2)) - }.collect().toSet - // a -> 0, b -> 2, c -> 1 - val expected = Set((0, 1.0, 0.0, 0.0), (1, 0.0, 0.0, 1.0), (2, 0.0, 1.0, 0.0), - (3, 1.0, 0.0, 0.0), (4, 1.0, 0.0, 0.0), (5, 0.0, 1.0, 0.0)) - assert(output === expected) - - // Verify 2nd column. - val output2 = encoded.select("id", "labelVec2").rdd.map { r => - val vec = r.getAs[Vector](1) - (r.getInt(0), vec(0), vec(1), vec(2), vec(3)) - }.collect().toSet - // A -> 1, B -> 0, C -> 3, D -> 2 - val expected2 = Set((0, 0.0, 1.0, 0.0, 0.0), (1, 1.0, 0.0, 0.0, 0.0), (2, 0.0, 0.0, 1.0, 0.0), - (3, 0.0, 1.0, 0.0, 0.0), (4, 1.0, 0.0, 0.0, 0.0), (5, 0.0, 0.0, 0.0, 1.0)) - assert(output2 === expected2) + val model = encoder.fit(df) + val encoded = model.transform(df) + encoded.select("output1", "expected1", "output2", "expected2").rdd.map { r => + (r.getAs[Vector](0), r.getAs[Vector](1), r.getAs[Vector](2), r.getAs[Vector](3)) + }.collect().foreach { case (vec1, vec2, vec3, vec4) => + assert(vec1 === vec2) + assert(vec3 === vec4) + } } test("OneHotEncoderEstimator: encoding multiple columns and dropLast = true") { - val transformed = stringIndexedMultipleCols() + val data = Seq( + Row(0.0, Vectors.sparse(2, Seq((0, 1.0))), 2.0, Vectors.sparse(3, Seq((2, 1.0)))), + Row(1.0, Vectors.sparse(2, Seq((1, 1.0))), 3.0, Vectors.sparse(3, Seq())), + Row(2.0, Vectors.sparse(2, Seq()), 0.0, Vectors.sparse(3, Seq((0, 1.0)))), + Row(0.0, Vectors.sparse(2, Seq((0, 1.0))), 1.0, Vectors.sparse(3, Seq((1, 1.0)))), + Row(0.0, Vectors.sparse(2, Seq((0, 1.0))), 0.0, Vectors.sparse(3, Seq((0, 1.0)))), + Row(2.0, Vectors.sparse(2, Seq()), 2.0, Vectors.sparse(3, Seq((2, 1.0))))) + + val schema = StructType(Array( + StructField("input1", DoubleType), + StructField("expected1", new VectorUDT), + StructField("input2", DoubleType), + StructField("expected2", new VectorUDT))) + + val df = spark.createDataFrame(sc.parallelize(data), schema) + val encoder = new OneHotEncoderEstimator() - .setInputCols(Array("labelIndex", "labelIndex2")) - .setOutputCols(Array("labelVec", "labelVec2")) - - val model = encoder.fit(transformed) - val encoded = model.transform(transformed) - - // Verify 1st column. - val output = encoded.select("id", "labelVec").rdd.map { r => - val vec = r.getAs[Vector](1) - (r.getInt(0), vec(0), vec(1)) - }.collect().toSet - // a -> 0, b -> 2, c -> 1 - val expected = Set((0, 1.0, 0.0), (1, 0.0, 0.0), (2, 0.0, 1.0), - (3, 1.0, 0.0), (4, 1.0, 0.0), (5, 0.0, 1.0)) - assert(output === expected) - - // Verify 2nd column. - val output2 = encoded.select("id", "labelVec2").rdd.map { r => - val vec = r.getAs[Vector](1) - (r.getInt(0), vec(0), vec(1), vec(2)) - }.collect().toSet - // A -> 1, B -> 0, C -> 3, D -> 2 - val expected2 = Set((0, 0.0, 1.0, 0.0), (1, 1.0, 0.0, 0.0), (2, 0.0, 0.0, 1.0), - (3, 0.0, 1.0, 0.0), (4, 1.0, 0.0, 0.0), (5, 0.0, 0.0, 0.0)) - assert(output2 === expected2) + .setInputCols(Array("input1", "input2")) + .setOutputCols(Array("output1", "output2")) + + val model = encoder.fit(df) + val encoded = model.transform(df) + encoded.select("output1", "expected1", "output2", "expected2").rdd.map { r => + (r.getAs[Vector](0), r.getAs[Vector](1), r.getAs[Vector](2), r.getAs[Vector](3)) + }.collect().foreach { case (vec1, vec2, vec3, vec4) => + assert(vec1 === vec2) + assert(vec3 === vec4) + } } test("Throw error on invalid values") { @@ -250,32 +259,145 @@ class OneHotEncoderEstimatorSuite err.getMessage.contains("Unseen value: 3.0. To handle unseen values") } - test("Keep on invalid values") { - val trainingData = Seq((0, 0), (1, 1), (2, 2)) - val trainingDF = trainingData.toDF("id", "a") - val testData = Seq((0, 0), (1, 1), (2, 3)) - val testDF = testData.toDF("id", "a") + test("Can't transform on negative input") { + val trainingDF = Seq((0, 0), (1, 1), (2, 2)).toDF("a", "b") + val testDF = Seq((0, 0), (-1, 2), (1, 3)).toDF("a", "b") - val dropLasts = Seq(false, true) - val expectedOutput = Seq( - Set((0, Seq(1.0, 0.0, 0.0, 0.0)), (1, Seq(0.0, 1.0, 0.0, 0.0)), (2, Seq(0.0, 0.0, 0.0, 1.0))), - Set((0, Seq(1.0, 0.0, 0.0)), (1, Seq(0.0, 1.0, 0.0)), (2, Seq(0.0, 0.0, 0.0)))) + val encoder = new OneHotEncoderEstimator() + .setInputCols(Array("a")) + .setOutputCols(Array("encoded")) - dropLasts.zipWithIndex.foreach { case (dropLast, idx) => - val encoder = new OneHotEncoderEstimator() - .setInputCols(Array("a")) - .setOutputCols(Array("encoded")) - .setHandleInvalid("keep") - .setDropLast(dropLast) - - val model = encoder.fit(trainingDF) - val encoded = model.transform(testDF) - - val output = encoded.select("id", "encoded").rdd.map { r => - val vec = r.getAs[Vector](1) - (r.getInt(0), vec.toArray.toSeq) - }.collect().toSet - assert(output === expectedOutput(idx)) + val model = encoder.fit(trainingDF) + val err = intercept[SparkException] { + model.transform(testDF).collect() + } + err.getMessage.contains("Negative value: -1.0. Input can't be negative") + } + + test("Keep on invalid values: dropLast = false") { + val trainingDF = Seq(Tuple1(0), Tuple1(1), Tuple1(2)).toDF("input") + + val testData = Seq( + Row(0.0, Vectors.sparse(4, Seq((0, 1.0)))), + Row(1.0, Vectors.sparse(4, Seq((1, 1.0)))), + Row(3.0, Vectors.sparse(4, Seq((3, 1.0))))) + + val schema = StructType(Array( + StructField("input", DoubleType), + StructField("expected", new VectorUDT))) + + val testDF = spark.createDataFrame(sc.parallelize(testData), schema) + + val encoder = new OneHotEncoderEstimator() + .setInputCols(Array("input")) + .setOutputCols(Array("output")) + .setHandleInvalid("keep") + .setDropLast(false) + + val model = encoder.fit(trainingDF) + val encoded = model.transform(testDF) + encoded.select("output", "expected").rdd.map { r => + (r.getAs[Vector](0), r.getAs[Vector](1)) + }.collect().foreach { case (vec1, vec2) => + assert(vec1 === vec2) + } + } + + test("Keep on invalid values: dropLast = true") { + val trainingDF = Seq(Tuple1(0), Tuple1(1), Tuple1(2)).toDF("input") + + val testData = Seq( + Row(0.0, Vectors.sparse(3, Seq((0, 1.0)))), + Row(1.0, Vectors.sparse(3, Seq((1, 1.0)))), + Row(3.0, Vectors.sparse(3, Seq()))) + + val schema = StructType(Array( + StructField("input", DoubleType), + StructField("expected", new VectorUDT))) + + val testDF = spark.createDataFrame(sc.parallelize(testData), schema) + + val encoder = new OneHotEncoderEstimator() + .setInputCols(Array("input")) + .setOutputCols(Array("output")) + .setHandleInvalid("keep") + .setDropLast(true) + + val model = encoder.fit(trainingDF) + val encoded = model.transform(testDF) + encoded.select("output", "expected").rdd.map { r => + (r.getAs[Vector](0), r.getAs[Vector](1)) + }.collect().foreach { case (vec1, vec2) => + assert(vec1 === vec2) + } + } + + test("OneHotEncoderModel changes dropLast") { + val data = Seq( + Row(0.0, Vectors.sparse(3, Seq((0, 1.0))), Vectors.sparse(2, Seq((0, 1.0)))), + Row(1.0, Vectors.sparse(3, Seq((1, 1.0))), Vectors.sparse(2, Seq((1, 1.0)))), + Row(2.0, Vectors.sparse(3, Seq((2, 1.0))), Vectors.sparse(2, Seq())), + Row(0.0, Vectors.sparse(3, Seq((0, 1.0))), Vectors.sparse(2, Seq((0, 1.0)))), + Row(0.0, Vectors.sparse(3, Seq((0, 1.0))), Vectors.sparse(2, Seq((0, 1.0)))), + Row(2.0, Vectors.sparse(3, Seq((2, 1.0))), Vectors.sparse(2, Seq()))) + + val schema = StructType(Array( + StructField("input", DoubleType), + StructField("expected1", new VectorUDT), + StructField("expected2", new VectorUDT))) + + val df = spark.createDataFrame(sc.parallelize(data), schema) + + val encoder = new OneHotEncoderEstimator() + .setInputCols(Array("input")) + .setOutputCols(Array("output")) + + val model = encoder.fit(df) + + model.setDropLast(false) + val encoded1 = model.transform(df) + encoded1.select("output", "expected1").rdd.map { r => + (r.getAs[Vector](0), r.getAs[Vector](1)) + }.collect().foreach { case (vec1, vec2) => + assert(vec1 === vec2) } + + model.setDropLast(true) + val encoded2 = model.transform(df) + encoded2.select("output", "expected2").rdd.map { r => + (r.getAs[Vector](0), r.getAs[Vector](1)) + }.collect().foreach { case (vec1, vec2) => + assert(vec1 === vec2) + } + } + + test("OneHotEncoderModel changes handleInvalid") { + val trainingDF = Seq(Tuple1(0), Tuple1(1), Tuple1(2)).toDF("input") + + val testData = Seq( + Row(0.0, Vectors.sparse(4, Seq((0, 1.0)))), + Row(1.0, Vectors.sparse(4, Seq((1, 1.0)))), + Row(3.0, Vectors.sparse(4, Seq((3, 1.0))))) + + val schema = StructType(Array( + StructField("input", DoubleType), + StructField("expected", new VectorUDT))) + + val testDF = spark.createDataFrame(sc.parallelize(testData), schema) + + val encoder = new OneHotEncoderEstimator() + .setInputCols(Array("input")) + .setOutputCols(Array("output")) + + val model = encoder.fit(trainingDF) + model.setHandleInvalid("error") + + val err = intercept[SparkException] { + model.transform(testDF).show + } + err.getMessage.contains("Unseen value: 3.0. To handle unseen values") + + model.setHandleInvalid("keep") + model.transform(testDF).collect() } }