diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala index 653fa41124f88..7ef3d3e750196 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala @@ -17,12 +17,14 @@ package org.apache.spark.ml.feature +import scala.collection.mutable.ArrayBuilder + import org.apache.hadoop.fs.Path import org.apache.spark.annotation.Since import org.apache.spark.ml._ import org.apache.spark.ml.attribute.{AttributeGroup, _} -import org.apache.spark.ml.linalg.{Vector, VectorUDT} +import org.apache.spark.ml.linalg._ import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ @@ -228,8 +230,7 @@ final class ChiSqSelectorModel private[ml] ( val transformedSchema = transformSchema(dataset.schema, logging = true) val newField = transformedSchema.last - // TODO: Make the transformer natively in ml framework to avoid extra conversion. - val transformer: Vector => Vector = v => chiSqSelector.transform(OldVectors.fromML(v)).asML + val transformer: Vector => Vector = v => compress(v) val selector = udf(transformer) dataset.withColumn($(outputCol), selector(col($(featuresCol))), newField.metadata) @@ -243,6 +244,42 @@ final class ChiSqSelectorModel private[ml] ( StructType(outputFields) } + private def compress(features: Vector): Vector = { + features match { + case SparseVector(_, indices, values) => + val newSize = selectedFeatures.length + val newValues = new ArrayBuilder.ofDouble + val newIndices = new ArrayBuilder.ofInt + var i = 0 + var j = 0 + var indicesIdx = 0 + var filterIndicesIdx = 0 + while (i < indices.length && j < newSize) { + indicesIdx = indices(i) + filterIndicesIdx = selectedFeatures(j) + if (indicesIdx == filterIndicesIdx) { + newIndices += j + newValues += values(i) + j += 1 + i += 1 + } else { + if (indicesIdx > filterIndicesIdx) { + j += 1 + } else { + i += 1 + } + } + } + Vectors.sparse(newSize, newIndices.result(), newValues.result()) + case DenseVector(values) => + val values = features.toArray + Vectors.dense(selectedFeatures.map(i => values(i))) + case other => + throw new UnsupportedOperationException( + s"Only sparse and dense vectors are supported but got ${other.getClass}.") + } + } + /** * Prepare the output column field, including per-feature metadata. */ diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala index 6386dd8a10801..cbe09c0e33beb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala @@ -21,7 +21,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.annotation.Since import org.apache.spark.ml._ -import org.apache.spark.ml.linalg.{Vector, VectorUDT} +import org.apache.spark.ml.linalg._ import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ @@ -130,8 +130,7 @@ class IDFModel private[ml] ( @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) - // TODO: Make the idfModel.transform natively in ml framework to avoid extra conversion. - val idf = udf { vec: Vector => idfModel.transform(OldVectors.fromML(vec)).asML } + val idf = udf { vec: Vector => IDFModel.transform(idfModel.idf.asML, vec) } dataset.withColumn($(outputCol), idf(col($(inputCol)))) } @@ -191,4 +190,31 @@ object IDFModel extends MLReadable[IDFModel] { @Since("1.6.0") override def load(path: String): IDFModel = super.load(path) + + // change to private + private def transform(idf: Vector, v: Vector): Vector = { + val newSize = v.size + v match { + case SparseVector(_, indices, values) => + val nnz = indices.length + val newValues = new Array[Double](nnz) + var k = 0 + while (k < nnz) { + newValues(k) = values(k) * idf(indices(k)) + k += 1 + } + Vectors.sparse(newSize, indices, newValues) + case DenseVector(values) => + val newValues = new Array[Double](newSize) + var j = 0 + while (j < newSize) { + newValues(j) = values(j) * idf(j) + j += 1 + } + Vectors.dense(newValues) + case other => + throw new UnsupportedOperationException( + s"Only sparse and dense vectors are supported but got ${other.getClass}.") + } + } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala index 6b913480fdc28..fb8cf1541077c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala @@ -150,13 +150,26 @@ class PCAModel private[ml] ( OldMatrices.fromML(pc).asInstanceOf[OldDenseMatrix], OldVectors.fromML(explainedVariance).asInstanceOf[OldDenseVector]) - // TODO: Make the transformer natively in ml framework to avoid extra conversion. - val transformer: Vector => Vector = v => pcaModel.transform(OldVectors.fromML(v)).asML + val transformer: Vector => Vector = v => transform(pcaModel.pc.asML, v) val pcaOp = udf(transformer) dataset.withColumn($(outputCol), pcaOp(col($(inputCol)))) } + private def transform(pc: DenseMatrix, vector: Vector): Vector = { + vector match { + case dv: DenseVector => + pc.transpose.multiply(dv) + case SparseVector(size, indices, values) => + val sm = Matrices.sparse(size, 1, Array(0, indices.length), indices, values).transpose + val projection = sm.multiply(pc) + Vectors.dense(projection.values) + case _ => + throw new IllegalArgumentException("Unsupported vector format. Expected " + + s"SparseVector or DenseVector. Instead got: ${vector.getClass}") + } + } + @Since("1.5.0") override def transformSchema(schema: StructType): StructType = { validateAndTransformSchema(schema) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala index d76d556280e96..16f9d3bba6831 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala @@ -21,7 +21,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.annotation.Since import org.apache.spark.ml._ -import org.apache.spark.ml.linalg.{Vector, VectorUDT} +import org.apache.spark.ml.linalg._ import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ @@ -162,13 +162,71 @@ class StandardScalerModel private[ml] ( transformSchema(dataset.schema, logging = true) val scaler = new feature.StandardScalerModel(std, mean, $(withStd), $(withMean)) - // TODO: Make the transformer natively in ml framework to avoid extra conversion. - val transformer: Vector => Vector = v => scaler.transform(OldVectors.fromML(v)).asML + val transformer: Vector => Vector = v => + transform(scaler.mean, v) val scale = udf(transformer) dataset.withColumn($(outputCol), scale(col($(inputCol)))) } + private def transform(mean: Vector, vector: Vector): Vector = { + require(mean.size == vector.size) + if(getWithMean) { + // By default, Scala generates Java methods for member variables. So every time when + // the member variables are accessed, `invokespecial` will be called which is expensive. + // This can be avoid by having a local reference of `shift`. + val localShift = mean.toArray + // Must have a copy of the values since it will be modified in place + val values = vector match { + // specially handle DenseVector because its toArray does not clone already + case d: DenseVector => d.values.clone() + case v: Vector => v.toArray + } + val newSize = values.length + if (getWithStd) { + var i = 0 + while (i < newSize) { + values(i) = if (std(i) != 0.0) (values(i) - localShift(i)) * (1.0 / std(i)) else 0.0 + i += 1 + } + } else { + var i = 0 + while (i < newSize) { + values(i) -= localShift(i) + i += 1 + } + } + Vectors.dense(values) + } else if (getWithStd) { + vector match { + case DenseVector(vs) => + val values = vs.clone() + val size = values.length + var i = 0 + while(i < size) { + values(i) *= (if (std(i) != 0.0) 1.0 / std(i) else 0.0) + i += 1 + } + Vectors.dense(values) + case SparseVector(size, indices, vs) => + // For sparse vector, the `index` array inside sparse vector object will not be changed, + // so we can re-use it to save memory. + val values = vs.clone() + val nnz = values.length + var i = 0 + while (i < nnz) { + values(i) *= (if (std(indices(i)) != 0.0) 1.0 / std(indices(i)) else 0.0) + i += 1 + } + Vectors.sparse(size, indices, values) + case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass) + } + } else { + // Note that it's safe since we always assume that the data in RDD should be immutable. + vector + } + } + @Since("1.4.0") override def transformSchema(schema: StructType): StructType = { validateAndTransformSchema(schema)