Skip to content

Commit

Permalink
[SPARK-23046][ML][SPARKR] Have RFormula include VectorSizeHint in pip…
Browse files Browse the repository at this point in the history
…eline

## What changes were proposed in this pull request?

Including VectorSizeHint in RFormula piplelines will allow them to be applied to streaming dataframes.

## How was this patch tested?

Unit tests.

Author: Bago Amirbekian <[email protected]>

Closes #20238 from MrBago/rFormulaVectorSize.
  • Loading branch information
MrBago authored and jkbradley committed Jan 11, 2018
1 parent 6f7aaed commit 186bf8f
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 8 deletions.
1 change: 1 addition & 0 deletions R/pkg/R/mllib_utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,4 @@ read.ml <- function(path) {
stop("Unsupported model: ", jobj)
}
}

18 changes: 15 additions & 3 deletions mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.{Estimator, Model, Pipeline, PipelineModel, PipelineStage, Transformer}
import org.apache.spark.ml.attribute.AttributeGroup
import org.apache.spark.ml.linalg.VectorUDT
import org.apache.spark.ml.linalg.{Vector, VectorUDT}
import org.apache.spark.ml.param.{BooleanParam, Param, ParamMap, ParamValidators}
import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasHandleInvalid, HasLabelCol}
import org.apache.spark.ml.util._
Expand Down Expand Up @@ -210,8 +210,8 @@ class RFormula @Since("1.5.0") (@Since("1.5.0") override val uid: String)

// First we index each string column referenced by the input terms.
val indexed: Map[String, String] = resolvedFormula.terms.flatten.distinct.map { term =>
dataset.schema(term) match {
case column if column.dataType == StringType =>
dataset.schema(term).dataType match {
case _: StringType =>
val indexCol = tmpColumn("stridx")
encoderStages += new StringIndexer()
.setInputCol(term)
Expand All @@ -220,6 +220,18 @@ class RFormula @Since("1.5.0") (@Since("1.5.0") override val uid: String)
.setHandleInvalid($(handleInvalid))
prefixesToRewrite(indexCol + "_") = term + "_"
(term, indexCol)
case _: VectorUDT =>
val group = AttributeGroup.fromStructField(dataset.schema(term))
val size = if (group.size < 0) {
dataset.select(term).first().getAs[Vector](0).size
} else {
group.size
}
encoderStages += new VectorSizeHint(uid)
.setHandleInvalid("optimistic")
.setInputCol(term)
.setSize(size)
(term, term)
case _ =>
(term, term)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

package org.apache.spark.ml.feature

import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.SparkException
import org.apache.spark.ml.attribute._
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamsSuite
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
import org.apache.spark.sql.{DataFrame, Encoder, Row}
import org.apache.spark.sql.types.DoubleType

class RFormulaSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
class RFormulaSuite extends MLTest with DefaultReadWriteTest {

import testImplicits._

Expand Down Expand Up @@ -548,4 +548,31 @@ class RFormulaSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
assert(result3.collect() === expected3.collect())
assert(result4.collect() === expected4.collect())
}

test("Use Vectors as inputs to formula.") {
val original = Seq(
(1, 4, Vectors.dense(0.0, 0.0, 4.0)),
(2, 4, Vectors.dense(1.0, 0.0, 4.0)),
(3, 5, Vectors.dense(1.0, 0.0, 5.0)),
(4, 5, Vectors.dense(0.0, 1.0, 5.0))
).toDF("id", "a", "b")
val formula = new RFormula().setFormula("id ~ a + b")
val (first +: rest) = Seq("id", "a", "b", "features", "label")
testTransformer[(Int, Int, Vector)](original, formula.fit(original), first, rest: _*) {
case Row(id: Int, a: Int, b: Vector, features: Vector, label: Double) =>
assert(label === id)
assert(features.toArray === a +: b.toArray)
}

val group = new AttributeGroup("b", 3)
val vectorColWithMetadata = original("b").as("b", group.toMetadata())
val dfWithMetadata = original.withColumn("b", vectorColWithMetadata)
val model = formula.fit(dfWithMetadata)
// model should work even when applied to dataframe without metadata.
testTransformer[(Int, Int, Vector)](original, model, first, rest: _*) {
case Row(id: Int, a: Int, b: Vector, features: Vector, label: Double) =>
assert(label === id)
assert(features.toArray === a +: b.toArray)
}
}
}

0 comments on commit 186bf8f

Please sign in to comment.