From b6b56fa1654fa04c0092e5166a129ae45aaa4d80 Mon Sep 17 00:00:00 2001 From: Le-Zheng <30695225+Le-Zheng@users.noreply.github.com> Date: Thu, 24 Jun 2021 09:44:41 +0800 Subject: [PATCH] fix dlframe (#3133) --- .../bigdl/dlframes/DLClassifier.scala | 94 ++++ .../bigdl/dlframes/DLEstimator.scala | 446 ++++++++++++++++++ .../bigdl/dlframes/DLImageReader.scala | 144 ++++++ .../bigdl/dlframes/DLImageTransformer.scala | 96 ++++ .../bigdl/dlframes/SharedParamsAdapter.scala | 53 +++ .../optim/dlframes/DLClassifierSpec.scala | 247 ++++++++++ .../optim/dlframes/DLEstimatorSpec.scala | 348 ++++++++++++++ .../optim/dlframes/DLImageReaderSpec.scala | 118 +++++ .../dlframes/DLImageTransformerSpec.scala | 138 ++++++ 9 files changed, 1684 insertions(+) create mode 100644 spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLClassifier.scala create mode 100644 spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLEstimator.scala create mode 100644 spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLImageReader.scala create mode 100644 spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLImageTransformer.scala create mode 100644 spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/SharedParamsAdapter.scala create mode 100644 spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/dlframes/DLClassifierSpec.scala create mode 100644 spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/dlframes/DLEstimatorSpec.scala create mode 100644 spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/dlframes/DLImageReaderSpec.scala create mode 100644 spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/dlframes/DLImageTransformerSpec.scala diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLClassifier.scala b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLClassifier.scala new file mode 100644 index 00000000000..e57ee03d168 --- /dev/null +++ b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLClassifier.scala @@ -0,0 +1,94 @@ +/* + * Copyright 2016 The BigDL Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.intel.analytics.bigdl.dlframes + +import com.intel.analytics.bigdl.tensor.Tensor +import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric +import com.intel.analytics.bigdl.{Criterion, Module} +import org.apache.spark.ml.adapter.SchemaUtils +import org.apache.spark.ml.param.ParamMap +import org.apache.spark.ml.util.Identifiable +import org.apache.spark.sql.types._ + +import scala.reflect.ClassTag + +/** + * [[DLClassifier]] is a specialized [[DLEstimator]] that simplifies the data format for + * classification tasks. It only supports label column of DoubleType. + * and the fitted [[DLClassifierModel]] will have the prediction column of DoubleType. + * + * @param model BigDL module to be optimized + * @param criterion BigDL criterion method + * @param featureSize The size (Tensor dimensions) of the feature data. + */ +@deprecated("`DLClassifier` is deprecated." + + "com.intel.analytics.bigdl.dlframes is deprecated in BigDL 0.11, " + + "and will be removed in future releases", "0.10.0") +class DLClassifier[T: ClassTag]( + @transient override val model: Module[T], + override val criterion : Criterion[T], + override val featureSize : Array[Int], + override val uid: String = Identifiable.randomUID("dlClassifier") + )(implicit ev: TensorNumeric[T]) + extends DLEstimator[T](model, criterion, featureSize, Array(1)) { + + override protected def wrapBigDLModel( + m: Module[T], featureSize: Array[Int]): DLClassifierModel[T] = { + val dlModel = new DLClassifierModel[T](m, featureSize) + copyValues(dlModel.setParent(this)).asInstanceOf[DLClassifierModel[T]] + } + + override def transformSchema(schema : StructType): StructType = { + validateParams(schema) + SchemaUtils.appendColumn(schema, $(predictionCol), DoubleType) + } + + override def copy(extra: ParamMap): DLClassifier[T] = { + copyValues(new DLClassifier(model, criterion, featureSize), extra) + } +} + +/** + * [[DLClassifierModel]] is a specialized [[DLModel]] for classification tasks. + * The prediction column will have the datatype of Double. + * + * @param model BigDL module to be optimized + * @param featureSize The size (Tensor dimensions) of the feature data. + */ +@deprecated("`DLClassifierModel` is deprecated." + + "com.intel.analytics.bigdl.dlframes is deprecated in BigDL 0.11, " + + "and will be removed in future releases", "0.10.0") +class DLClassifierModel[T: ClassTag]( + @transient override val model: Module[T], + featureSize : Array[Int], + override val uid: String = "DLClassifierModel" + )(implicit ev: TensorNumeric[T]) extends DLModel[T](model, featureSize) { + + protected override def outputToPrediction(output: Tensor[T]): Any = { + if (output.size().deep == Array(1).deep) { + val raw = ev.toType[Double](output.toArray().head) + if (raw > 0.5) 1.0 else 0.0 + } else { + ev.toType[Double](output.max(1)._2.valueAt(1)) + } + } + + override def transformSchema(schema : StructType): StructType = { + validateDataType(schema, $(featuresCol)) + SchemaUtils.appendColumn(schema, $(predictionCol), DoubleType) + } +} + diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLEstimator.scala b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLEstimator.scala new file mode 100644 index 00000000000..9441be959f5 --- /dev/null +++ b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLEstimator.scala @@ -0,0 +1,446 @@ +/* + * Copyright 2016 The BigDL Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.intel.analytics.bigdl.dlframes + +import com.intel.analytics.bigdl.{Criterion, Module} +import com.intel.analytics.bigdl.dataset._ +import com.intel.analytics.bigdl.models.utils.ModelBroadcast +import com.intel.analytics.bigdl.optim._ +import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric +import com.intel.analytics.bigdl.tensor.{Storage, Tensor} +import com.intel.analytics.bigdl.utils.T +import com.intel.analytics.bigdl.visualization.{TrainSummary, ValidationSummary} +import org.apache.spark.ml.adapter.{HasFeaturesCol, HasPredictionCol, SchemaUtils} +import org.apache.spark.ml.{DLEstimatorBase, DLTransformerBase, VectorCompatibility} +import org.apache.spark.ml.param.{IntParam, ParamMap, ParamValidators, _} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.types._ +import org.apache.spark.sql.{DataFrame, Row} + +import scala.reflect.ClassTag + +private[dlframes] trait HasBatchSize extends Params { + + final val batchSize: Param[Int] = new Param[Int](this, "batchSize", "batchSize") + + def getBatchSize: Int = $(batchSize) +} + +/** + * Common trait for DLEstimator and DLModel + */ +private[dlframes] trait DLParams[@specialized(Float, Double) T] extends HasFeaturesCol + with HasPredictionCol with VectorCompatibility with HasBatchSize { + + /** + * When to stop the training, passed in a [[Trigger]]. E.g. Trigger.maxIterations + */ + final val endWhen = new Param[Trigger](this, "endWhen", "Trigger to stop the training") + + def getEndWhen: Trigger = $(endWhen) + + /** + * learning rate for the optimizer in the DLEstimator. + * Default: 0.001 + */ + final val learningRate = new DoubleParam( + this, "learningRate", "learningRate", ParamValidators.gt(0)) + + def getLearningRate: Double = $(learningRate) + + /** + * learning rate decay for each iteration. + * Default: 0 + */ + final val learningRateDecay = new DoubleParam(this, "learningRateDecay", "learningRateDecay") + + def getLearningRateDecay: Double = $(learningRateDecay) + + /** + * Number of max Epoch for the training, an epoch refers to a traverse over the training data + * Default: 50 + */ + final val maxEpoch = new IntParam(this, "maxEpoch", "number of max Epoch", ParamValidators.gt(0)) + + def getMaxEpoch: Int = $(maxEpoch) + + /** + * optimization method to be used. BigDL supports many optimization methods like Adam, + * SGD and LBFGS. Refer to package com.intel.analytics.bigdl.optim for all the options. + * Default: SGD + */ + final val optimMethod = new Param[OptimMethod[T]](this, "optimMethod", "optimMethod") + + def getOptimMethod: OptimMethod[T] = $(optimMethod) + + setDefault(batchSize -> 1) + + /** + * Validate if feature and label columns are of supported data types. + * Default: 0 + */ + protected def validateDataType(schema: StructType, colName: String): Unit = { + val dataTypes = Seq( + new ArrayType(DoubleType, false), + new ArrayType(DoubleType, true), + new ArrayType(FloatType, false), + new ArrayType(FloatType, true), + DoubleType, + FloatType + ) ++ validVectorTypes + + // TODO use SchemaUtils.checkColumnTypes after convert to 2.0 + val actualDataType = schema(colName).dataType + require(dataTypes.exists(actualDataType.equals), + s"Column $colName must be of type equal to one of the following types: " + + s"${dataTypes.mkString("[", ", ", "]")} but was actually of type $actualDataType.") + } + + /** + * Get conversion function to extract data from original DataFrame + * Default: 0 + */ + protected def getConvertFunc(colType: DataType): (Row, Int) => Seq[AnyVal] = { + colType match { + case ArrayType(DoubleType, false) => + (row: Row, index: Int) => row.getSeq[Double](index) + case ArrayType(DoubleType, true) => + (row: Row, index: Int) => row.getSeq[Double](index) + case ArrayType(FloatType, false) => + (row: Row, index: Int) => row.getSeq[Float](index) + case ArrayType(FloatType, true) => + (row: Row, index: Int) => row.getSeq[Float](index) + case DoubleType => + (row: Row, index: Int) => Seq[Double](row.getDouble(index)) + case FloatType => + (row: Row, index: Int) => Seq[Float](row.getFloat(index)) + case _ => + if (colType.typeName.contains("vector")) { + (row: Row, index: Int) => getVectorSeq(row, colType, index) + } else { + throw new IllegalArgumentException( + s"$colType is not a supported (Unexpected path).") + } + } + } +} + + +/** + * [[DLEstimator]] helps to train a BigDL Model with the Spark ML Estimator/Transfomer pattern, + * thus Spark users can conveniently fit BigDL into Spark ML pipeline. + * + * [[DLEstimator]] supports feature and label data in the format of + * Array[Double], Array[Float], org.apache.spark.mllib.linalg.{Vector, VectorUDT}, + * org.apache.spark.ml.linalg.{Vector, VectorUDT}, Double and Float. + * + * User should specify the feature data dimensions and label data dimensions via the constructor + * parameters featureSize and labelSize respectively. Internally the feature and label data are + * converted to BigDL tensors, to further train a BigDL model efficiently. + * + * For details usage, please refer to examples in package + * com.intel.analytics.bigdl.example.MLPipeline + * + * @param model BigDL module to be optimized + * @param criterion BigDL criterion method + * @param featureSize The size (Tensor dimensions) of the feature data. e.g. an image may be with + * width * height = 28 * 28, featureSize = Array(28, 28). + * @param labelSize The size (Tensor dimensions) of the label data. + */ +@deprecated("`DLEstimator` is deprecated." + + "com.intel.analytics.bigdl.dlframes is deprecated in BigDL 0.11, " + + "and will be removed in future releases", "0.10.0") +class DLEstimator[@specialized(Float, Double) T: ClassTag]( + @transient val model: Module[T], + val criterion : Criterion[T], + val featureSize : Array[Int], + val labelSize : Array[Int], + override val uid: String = "DLEstimator")(implicit ev: TensorNumeric[T]) + extends DLEstimatorBase[DLEstimator[T], DLModel[T]] with DLParams[T] { + + def setFeaturesCol(featuresColName: String): this.type = set(featuresCol, featuresColName) + + def setLabelCol(labelColName : String) : this.type = set(labelCol, labelColName) + + def setPredictionCol(value: String): this.type = set(predictionCol, value) + + def setBatchSize(value: Int): this.type = set(batchSize, value) + + def setEndWhen(trigger: Trigger): this.type = set(endWhen, trigger) + + def setLearningRate(value: Double): this.type = set(learningRate, value) + setDefault(learningRate -> 1e-3) + + def setLearningRateDecay(value: Double): this.type = set(learningRateDecay, value) + setDefault(learningRateDecay -> 0.0) + + def setMaxEpoch(value: Int): this.type = set(maxEpoch, value) + setDefault(maxEpoch -> 50) + + def setOptimMethod(value: OptimMethod[T]): this.type = set(optimMethod, value) + set(optimMethod, new SGD[T]) + + @transient private var trainSummary: Option[TrainSummary] = None + + def getTrainSummary: Option[TrainSummary] = trainSummary + + /** + * Statistics (LearningRate, Loss, Throughput, Parameters) collected during training for the + * training data, which can be used for visualization via Tensorboard. + * Use setTrainSummary to enable train logger. Then the log will be saved to + * logDir/appName/train as specified by the parameters of TrainSummary. + * + * Default: Not enabled + */ + def setTrainSummary(value: TrainSummary): this.type = { + this.trainSummary = Some(value) + this + } + + @transient private var validationSummary: Option[ValidationSummary] = None + + /** + * Statistics (LearningRate, Loss, Throughput, Parameters) collected during training for the + * validation data if validation data is set, which can be used for visualization via + * Tensorboard. Use setValidationSummary to enable validation logger. Then the log will be + * saved to logDir/appName/ as specified by the parameters of validationSummary. + * + * Default: None + */ + def getValidationSummary: Option[ValidationSummary] = validationSummary + + /** + * Enable validation Summary + */ + def setValidationSummary(value: ValidationSummary): this.type = { + this.validationSummary = Some(value) + this + } + + @transient private var validationTrigger: Option[Trigger] = None + @transient private var validationDF: DataFrame = _ + @transient private var validationMethods: Array[ValidationMethod[T]] = _ + @transient private var validationBatchSize: Int = 0 + /** + * Set a validate evaluation during training + * + * @param trigger how often to evaluation validation set + * @param validationDF validate data set + * @param vMethods a set of validation method [[ValidationMethod]] + * @param batchSize batch size for validation + * @return this optimizer + */ + def setValidation(trigger: Trigger, validationDF: DataFrame, + vMethods : Array[ValidationMethod[T]], batchSize: Int) + : this.type = { + this.validationTrigger = Some(trigger) + this.validationDF = validationDF + this.validationMethods = vMethods + this.validationBatchSize = batchSize + this + } + + protected def validateParams(schema : StructType): Unit = { + validateDataType(schema, $(featuresCol)) + validateDataType(schema, $(labelCol)) + if(isSet(endWhen) && isSet(maxEpoch)) { + throw new IllegalArgumentException(s"endWhen and maxEpoch cannot be both set") + } + if (validationTrigger.isEmpty && validationSummary.isDefined) { + throw new IllegalArgumentException( + s"validationSummary is only valid if validation data is set.") + } + } + + override def transformSchema(schema : StructType): StructType = { + validateParams(schema) + SchemaUtils.appendColumn(schema, $(predictionCol), ArrayType(DoubleType, false)) + } + + protected override def internalFit(dataFrame: DataFrame): DLModel[T] = { + val localFeatureCol = $(featuresCol) + val localLabelCol = $(labelCol) + + def getSamples(dataFrame: DataFrame): RDD[Sample[T]] = { + val featureType = dataFrame.schema(localFeatureCol).dataType + val featureColIndex = dataFrame.schema.fieldIndex(localFeatureCol) + val labelType = dataFrame.schema(localLabelCol).dataType + val labelColIndex = dataFrame.schema.fieldIndex(localLabelCol) + + val featureFunc = getConvertFunc(featureType) + val labelFunc = getConvertFunc(labelType) + + val featureAndLabel: RDD[(Seq[AnyVal], Seq[AnyVal])] = dataFrame.rdd.map { row => + val features = featureFunc(row, featureColIndex) + val labels = labelFunc(row, labelColIndex) + (features, labels) + } + + val samples = featureAndLabel.map { case (f, l) => + // convert feature and label data type to the same type with model + // TODO: investigate to reduce memory consumption during conversion. + val feature = f.head match { + case dd: Double => f.asInstanceOf[Seq[Double]].map(ev.fromType(_)) + case ff: Float => f.asInstanceOf[Seq[Float]].map(ev.fromType(_)) + } + val label = l.head match { + case dd: Double => l.asInstanceOf[Seq[Double]].map(ev.fromType(_)) + case ff: Float => l.asInstanceOf[Seq[Float]].map(ev.fromType(_)) + } + (feature, label) + }.map { case (feature, label) => + Sample(Tensor(feature.toArray, featureSize), Tensor(label.toArray, labelSize)) + } + samples + } + + val trainingSamples = getSamples(dataFrame) + val state = T("learningRate" -> $(learningRate), "learningRateDecay" -> $(learningRateDecay)) + val endTrigger = if (isSet(endWhen)) $(endWhen) else Trigger.maxEpoch($(maxEpoch)) + val optimizer = Optimizer(model, trainingSamples, criterion, $(batchSize)) + .setState(state) + .setOptimMethod($(optimMethod)) + .setEndWhen(endTrigger) + + if (validationTrigger.isDefined) { + val validationSamples = getSamples(validationDF) + optimizer.setValidation( + validationTrigger.get, + validationSamples, + validationMethods, + validationBatchSize) + if (this.validationSummary.isDefined) { + optimizer.setValidationSummary(this.validationSummary.get) + } + } + + if (this.trainSummary.isDefined) { + optimizer.setTrainSummary(this.trainSummary.get) + } + + val optimizedModel = optimizer.optimize() + wrapBigDLModel(optimizedModel, featureSize) + } + + /** + * sub classes can extend the method and return required model for different transform tasks + */ + protected def wrapBigDLModel(m: Module[T], featureSize: Array[Int]): DLModel[T] = { + val dlModel = new DLModel[T](m, featureSize) + copyValues(dlModel.setParent(this)) + } + + override def copy(extra: ParamMap): DLEstimator[T] = { + copyValues(new DLEstimator(model, criterion, featureSize, labelSize), extra) + } +} + +/** + * [[DLModel]] helps embed a BigDL model into a Spark Transformer, thus Spark users can + * conveniently merge BigDL into Spark ML pipeline. + * [[DLModel]] supports feature data in the format of + * Array[Double], Array[Float], org.apache.spark.mllib.linalg.{Vector, VectorUDT}, + * org.apache.spark.ml.linalg.{Vector, VectorUDT}, Double and Float. + * Internally [[DLModel]] use features column as storage of the feature data, and create + * Tensors according to the constructor parameter featureSize. + * + * [[DLModel]] is compatible with both spark 1.5-plus and 2.0 by extending ML Transformer. + * @param model trainned BigDL models to use in prediction. + * @param featureSize The size (Tensor dimensions) of the feature data. (e.g. an image may be with + * featureSize = 28 * 28). + */ +@deprecated("`DLModel` is deprecated." + + "com.intel.analytics.bigdl.dlframes is deprecated in BigDL 0.11, " + + "and will be removed in future releases", "0.10.0") +class DLModel[@specialized(Float, Double) T: ClassTag]( + @transient val model: Module[T], + var featureSize : Array[Int], + override val uid: String = "DLModel" + )(implicit ev: TensorNumeric[T]) + extends DLTransformerBase[DLModel[T]] with DLParams[T] with HasBatchSize { + + def setFeaturesCol(featuresColName: String): this.type = set(featuresCol, featuresColName) + + def setPredictionCol(value: String): this.type = set(predictionCol, value) + + def setFeatureSize(value: Array[Int]): this.type = { + this.featureSize = value + this + } + + def setBatchSize(value: Int): this.type = set(batchSize, value) + + def getFeatureSize: Array[Int] = this.featureSize + + /** + * Perform a prediction on featureCol, and write result to the predictionCol. + */ + protected override def internalTransform(dataFrame: DataFrame): DataFrame = { + val featureType = dataFrame.schema($(featuresCol)).dataType + val featureColIndex = dataFrame.schema.fieldIndex($(featuresCol)) + val featureFunc = getConvertFunc(featureType) + val sc = dataFrame.sqlContext.sparkContext + val modelBroadCast = ModelBroadcast[T]().broadcast(sc, model.evaluate()) + val localBatchSize = $(batchSize) + val transformerBC = sc.broadcast(SampleToMiniBatch[T](localBatchSize)) + + val resultRDD = dataFrame.rdd.mapPartitions { rowIter => + val localModel = modelBroadCast.value() + val transformer = transformerBC.value.cloneTransformer() + rowIter.grouped(localBatchSize).flatMap { rowBatch => + val samples = rowBatch.map { row => + val features = featureFunc(row, featureColIndex) + val featureBuffer = features.head match { + case dd: Double => features.asInstanceOf[Seq[Double]].map(ev.fromType(_)) + case ff: Float => features.asInstanceOf[Seq[Float]].map(ev.fromType(_)) + } + Sample(Tensor(featureBuffer.toArray, featureSize)) + }.toIterator + val predictions = transformer(samples).flatMap { batch => + val batchResult = localModel.forward(batch.getInput()) + batchResult.toTensor.split(1).map(outputToPrediction) + } + rowBatch.toIterator.zip(predictions).map { case (row, predict) => + Row.fromSeq(row.toSeq ++ Seq(predict)) + } + } + } + + val resultSchema = transformSchema(dataFrame.schema) + dataFrame.sqlContext.createDataFrame(resultRDD, resultSchema) + } + + protected def outputToPrediction(output: Tensor[T]): Any = { + output.clone().storage().array().map(ev.toType[Double]) + } + + override def transformSchema(schema : StructType): StructType = { + validateDataType(schema, $(featuresCol)) + SchemaUtils.appendColumn(schema, $(predictionCol), ArrayType(DoubleType, false)) + } + + override def copy(extra: ParamMap): DLModel[T] = { + val copied = new DLModel(model, featureSize, uid).setParent(parent) + copyValues(copied, extra) + } +} + +// TODO, add save/load +object DLModel { + + +} + diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLImageReader.scala b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLImageReader.scala new file mode 100644 index 00000000000..2df4112b3d7 --- /dev/null +++ b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLImageReader.scala @@ -0,0 +1,144 @@ +/* + * Copyright 2016 The BigDL Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.intel.analytics.bigdl.dlframes + +import com.intel.analytics.bigdl.tensor.{Storage, Tensor} +import com.intel.analytics.bigdl.transform.vision.image.{BytesToMat, ImageFeature, ImageFrame} +import org.apache.spark.SparkContext +import org.apache.spark.sql.types._ +import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.opencv.core.CvType +import scala.language.existentials + +/** + * Definition for image data in a DataFrame + */ +object DLImageSchema { + + /** + * Schema for the image column in a DataFrame. Image data is saved in an array of Bytes. + * The format is compatible with Spark Image format in v2.3 + */ + val byteSchema = StructType( + StructField("origin", StringType, true) :: + StructField("height", IntegerType, false) :: + StructField("width", IntegerType, false) :: + StructField("nChannels", IntegerType, false) :: + // OpenCV-compatible type: CV_8UC3, CV_32FC3 in most cases + StructField("mode", IntegerType, false) :: + // Bytes in OpenCV-compatible order: row-wise BGR in most cases + StructField("data", BinaryType, false) :: Nil) + + /** + * Schema for the image column in a DataFrame. Image data is saved in an array of Floats. + */ + val floatSchema = StructType( + StructField("origin", StringType, true) :: + StructField("height", IntegerType, false) :: + StructField("width", IntegerType, false) :: + StructField("nChannels", IntegerType, false) :: + // OpenCV-compatible type: CV_8UC3, CV_32FC3 in most cases + StructField("mode", IntegerType, false) :: + // floats in OpenCV-compatible order: row-wise BGR in most cases + StructField("data", new ArrayType(FloatType, false), false) :: Nil) + + private[dlframes] def imf2Row(imf: ImageFeature): Row = { + val (mode, data) = if (imf.contains(ImageFeature.imageTensor)) { + val floatData = imf(ImageFeature.imageTensor).asInstanceOf[Tensor[Float]].storage().array() + val cvType = imf.getChannel() match { + case 1 => CvType.CV_32FC1 + case 3 => CvType.CV_32FC3 + case 4 => CvType.CV_32FC4 + case other => throw new IllegalArgumentException(s"Unsupported number of channels:" + + s" $other in ${imf.uri()}. Only 1, 3 and 4 are supported.") + } + (cvType, floatData) + } else if (imf.contains(ImageFeature.bytes)) { + val bytesData = imf.bytes() + val cvType = imf.getChannel() match { + case 1 => CvType.CV_8UC1 + case 3 => CvType.CV_8UC3 + case 4 => CvType.CV_8UC4 + case other => throw new IllegalArgumentException(s"Unsupported number of channels:" + + s" $other in ${imf.uri()}. Only 1, 3 and 4 are supported.") + } + (cvType, bytesData) + } else { + throw new IllegalArgumentException(s"ImageFeature should have imageTensor or bytes.") + } + + Row( + imf.uri(), + imf.getHeight(), + imf.getWidth(), + imf.getChannel(), + mode, + data + ) + } + + private[dlframes] def row2IMF(row: Row): ImageFeature = { + val (origin, h, w, c) = (row.getString(0), row.getInt(1), row.getInt(2), row.getInt(3)) + val imf = ImageFeature() + imf.update(ImageFeature.uri, origin) + imf.update(ImageFeature.size, (h, w, c)) + val storageType = row.getInt(4) + storageType match { + case CvType.CV_8UC3 | CvType.CV_8UC1 | CvType.CV_8UC4 => + imf.update(ImageFeature.bytes, row.getAs[Array[Byte]](5)) + BytesToMat().transform(imf) + case CvType.CV_32FC3 | CvType.CV_32FC1 | CvType.CV_32FC4 => + val data = row.getSeq[Float](5).toArray + val size = Array(h, w, c) + val ten = Tensor(Storage(data)).resize(size) + imf.update(ImageFeature.imageTensor, ten) + case _ => + throw new IllegalArgumentException(s"Unsupported data type in imageColumn: $storageType") + } + imf + } +} + +/** + * Primary DataFrame-based image loading interface, defining API to read images into DataFrame. + */ +object DLImageReader { + + /** + * DataFrame with a single column of images named "image" (nullable) + */ + private val imageColumnSchema = + StructType(StructField("image", DLImageSchema.byteSchema, true) :: Nil) + + /** + * Read the directory of images into DataFrame from the local or remote source. + * + * @param path Directory to the input data files, the path can be comma separated paths as the + * list of inputs. Wildcards path are supported similarly to sc.binaryFiles(path). + * @param sc SparkContext to be used. + * @param minPartitions Number of the DataFrame partitions, + * if omitted uses defaultParallelism instead + * @return DataFrame with a single column "image" of images; + * see DLImageSchema for the details + */ + def readImages(path: String, sc: SparkContext, minPartitions: Int = 1): DataFrame = { + val imageFrame = ImageFrame.read(path, sc, minPartitions) + val rowRDD = imageFrame.toDistributed().rdd.map { imf => + Row(DLImageSchema.imf2Row(imf)) + } + SQLContext.getOrCreate(sc).createDataFrame(rowRDD, imageColumnSchema) + } +} diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLImageTransformer.scala b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLImageTransformer.scala new file mode 100644 index 00000000000..fa728e0eb17 --- /dev/null +++ b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLImageTransformer.scala @@ -0,0 +1,96 @@ +/* + * Copyright 2016 The BigDL Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.intel.analytics.bigdl.dlframes + +import com.intel.analytics.bigdl.dataset.Transformer +import com.intel.analytics.bigdl.transform.vision.image.{FeatureTransformer, ImageFeature, MatToTensor} +import org.apache.spark.ml.DLTransformerBase +import org.apache.spark.ml.adapter.{HasInputCol, HasOutputCol, SchemaUtils} +import org.apache.spark.ml.util.Identifiable +import org.apache.spark.sql.types._ +import org.apache.spark.sql.{DataFrame, Row} + +/** + * Provides DataFrame-based API for image pre-processing and feature transformation. + * DLImageTransformer follows the Spark Transformer API pattern and can be used as one stage + * in Spark ML pipeline. + * + * The input column can be either DLImageSchema.byteSchema or DLImageSchema.floatSchema. If + * using DLImageReader, the default format is DLImageSchema.byteSchema + * The output column is always DLImageSchema.floatSchema. + * + * @param transformer Single or a sequence of BigDL FeatureTransformers to be used. E.g. + * Resize(256, 256) -> CenterCrop(224, 224) -> + * ChannelNormalize(123, 117, 104, 1, 1, 1) -> MatToTensor() + */ +class DLImageTransformer ( + val transformer: Transformer[ImageFeature, ImageFeature], + override val uid: String) + extends DLTransformerBase with HasInputCol with HasOutputCol { + + def this(transformer: FeatureTransformer) = + this(transformer, Identifiable.randomUID("DLImageTransformer")) + + setDefault(inputCol -> "image") + def setInputCol(value: String): this.type = set(inputCol, value) + + setDefault(outputCol -> "output") + def setOutputCol(value: String): this.type = set(outputCol, value) + + protected def validateInputType(inputType: DataType): Unit = { + val validTypes = Array(DLImageSchema.floatSchema, DLImageSchema.byteSchema) + + require(validTypes.exists(t => SchemaUtils.sameType(inputType, t)), + s"Bad input type: $inputType. Requires ${validTypes.mkString(", ")}") + } + + override def transformSchema(schema: StructType): StructType = { + val inputType = schema($(inputCol)).dataType + validateInputType(inputType) + if (schema.fieldNames.contains($(outputCol))) { + throw new IllegalArgumentException(s"Output column ${$(outputCol)} already exists.") + } + + val outputFields = schema.fields :+ + StructField($(outputCol), DLImageSchema.floatSchema, nullable = false) + StructType(outputFields) + } + + protected override def internalTransform(dataFrame: DataFrame): DataFrame = { + transformSchema(dataFrame.schema, logging = true) + val sc = dataFrame.sqlContext.sparkContext + val localTransformer = this.transformer + val transformerBC = sc.broadcast(localTransformer) + val toTensorBC = sc.broadcast(MatToTensor[Float](shareBuffer = true)) + + val inputColIndex = dataFrame.schema.fieldIndex($(inputCol)) + val resultRDD = dataFrame.rdd.mapPartitions { rowIter => + val localTransformer = transformerBC.value.cloneTransformer() + val toTensorTransformer = toTensorBC.value.cloneTransformer().asInstanceOf[MatToTensor[Float]] + rowIter.map { row => + val imf = DLImageSchema.row2IMF(row.getAs[Row](inputColIndex)) + val output = localTransformer.apply(Iterator(imf)).toArray.head + if (!output.contains(ImageFeature.imageTensor)) { + toTensorTransformer.transform(output) + } + Row.fromSeq(row.toSeq ++ Seq(DLImageSchema.imf2Row(output))) + } + } + + val resultSchema = transformSchema(dataFrame.schema) + dataFrame.sqlContext.createDataFrame(resultRDD, resultSchema) + } +} diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/SharedParamsAdapter.scala b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/SharedParamsAdapter.scala new file mode 100644 index 00000000000..c7737f74de0 --- /dev/null +++ b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/SharedParamsAdapter.scala @@ -0,0 +1,53 @@ +/* + * Copyright 2016 The BigDL Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.adapter + +import org.apache.spark.sql.types.{DataType, StructField, StructType} + + +trait HasPredictionCol extends org.apache.spark.ml.param.shared.HasPredictionCol + +trait HasFeaturesCol extends org.apache.spark.ml.param.shared.HasFeaturesCol + +trait HasInputCol extends org.apache.spark.ml.param.shared.HasInputCol + +trait HasOutputCol extends org.apache.spark.ml.param.shared.HasOutputCol + +object SchemaUtils { + + /** + * Appends a new column to the input schema. This fails if the given output column already exists + * @param schema input schema + * @param colName new column name. If this column name is an empty string "", this method returns + * the input schema unchanged. This allows users to disable output columns. + * @param dataType new column data type + * @return new schema with the input column appended + */ + def appendColumn( + schema: StructType, + colName: String, + dataType: DataType, + nullable: Boolean = false): StructType = { + + val colSF = StructField(colName, dataType, nullable) + require(!schema.fieldNames.contains(colSF.name), s"Column ${colSF.name} already exists.") + StructType(schema.fields :+ colSF) + } + + def sameType(a: DataType, b: DataType): Boolean = a.sameType(b) + +} diff --git a/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/dlframes/DLClassifierSpec.scala b/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/dlframes/DLClassifierSpec.scala new file mode 100644 index 00000000000..2d9eb786a2c --- /dev/null +++ b/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/dlframes/DLClassifierSpec.scala @@ -0,0 +1,247 @@ +/* + * Copyright 2016 The BigDL Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.analytics.bigdl.dlframes + +import com.intel.analytics.bigdl.models.lenet.LeNet5 +import com.intel.analytics.bigdl.nn._ +import com.intel.analytics.bigdl.optim.{Adam, LBFGS, Loss, Trigger} +import com.intel.analytics.bigdl.tensor.Tensor +import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric.NumericFloat +import com.intel.analytics.bigdl.utils.Engine +import com.intel.analytics.bigdl.utils.RandomGenerator.RNG +import com.intel.analytics.bigdl.visualization.ValidationSummary +import org.apache.log4j.{Level, Logger} +import org.apache.spark.ml.feature.MinMaxScaler +import org.apache.spark.SparkContext +import org.apache.spark.ml._ +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers} + +import scala.collection.mutable.ArrayBuffer +import scala.util.Random + +class DLClassifierSpec extends FlatSpec with Matchers with BeforeAndAfter { + var sc : SparkContext = _ + var sqlContext : SQLContext = _ + var smallData: Seq[(Array[Double], Double)] = _ + val nRecords = 100 + val maxEpoch = 20 + + before { + val conf = Engine.createSparkConf().setAppName("Test DLEstimator").setMaster("local[1]") + sc = SparkContext.getOrCreate(conf) + sqlContext = new SQLContext(sc) + Random.setSeed(42) + RNG.setSeed(42) + smallData = DLEstimatorSpec.generateTestInput( + nRecords, Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0), -1.0, 42L) + Engine.init + } + + after{ + if (sc != null) { + sc.stop() + } + } + + "An DLClassifier" should "has correct default params" in { + val model = Linear[Float](10, 1) + val criterion = ClassNLLCriterion[Float]() + val estimator = new DLClassifier[Float](model, criterion, Array(10)) + assert(estimator.getFeaturesCol == "features") + assert(estimator.getLabelCol == "label") + assert(estimator.getMaxEpoch == 50) + assert(estimator.getBatchSize == 1) + assert(estimator.getLearningRate == 1e-3) + assert(estimator.getLearningRateDecay == 0) + } + + "An DLClassifier" should "get reasonale accuracy" in { + val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) + val criterion = ClassNLLCriterion[Float]() + val classifier = new DLClassifier[Float](model, criterion, Array(6)) + .setOptimMethod(new LBFGS[Float]()) + .setLearningRate(0.1) + .setBatchSize(nRecords) + .setMaxEpoch(maxEpoch) + val data = sc.parallelize(smallData) + val df = sqlContext.createDataFrame(data).toDF("features", "label") + + val dlModel = classifier.fit(df) + dlModel.isInstanceOf[DLClassifierModel[_]] should be(true) + assert(dlModel.transform(df).where("prediction=label").count() > nRecords * 0.8) + } + + "An DLClassifier" should "support different FEATURE types" in { + val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) + val criterion = ClassNLLCriterion[Float]() + val classifier = new DLClassifier[Float](model, criterion, Array(6)) + .setLearningRate(0.1) + .setBatchSize(2) + .setEndWhen(Trigger.maxIteration(2)) + + Array( + sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1, p._2)))) + .toDF("features", "label"), // Array[Double] + sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1.map(_.toFloat), p._2)))) + .toDF("features", "label"), // Array[Float] + sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (Vectors.dense(p._1), p._2)))) + .toDF("features", "label") // MLlib Vector + // TODO: add ML Vector when ut for Spark 2.0+ is ready + ).foreach { df => + val dlModel = classifier.fit(df) + dlModel.transform(df).collect() + } + } + + "An DLClassifier" should "support scalar FEATURE" in { + val model = new Sequential().add(Linear[Float](1, 2)).add(LogSoftMax[Float]) + val criterion = ClassNLLCriterion[Float]() + val classifier = new DLClassifier[Float](model, criterion, Array(1)) + .setLearningRate(0.1) + .setBatchSize(2) + .setEndWhen(Trigger.maxIteration(2)) + + Array( + sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1.head.toFloat, p._2)))) + .toDF("features", "label"), // Float + sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1.head, p._2)))) + .toDF("features", "label") // Double + // TODO: add ML Vector when ut for Spark 2.0+ is ready + ).foreach { df => + val dlModel = classifier.fit(df) + dlModel.transform(df).collect() + } + } + + "An DLClassifier" should "support binary classification" in { + val model = new Sequential().add(Linear[Float](6, 1)).add(Sigmoid[Float]) + val criterion = BCECriterion[Float]() + val classifier = new DLClassifier[Float](model, criterion, Array(6)) + .setLearningRate(0.1) + .setBatchSize(2) + .setEndWhen(Trigger.maxIteration(10)) + + Array( + sqlContext.createDataFrame(sc.parallelize( + smallData.map(p => (p._1.map(_.toFloat), p._2.toFloat - 1)))) + .toDF("features", "label"), // Float + sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1, p._2 - 1)))) + .toDF("features", "label") // Double + // TODO: add ML Vector when ut for Spark 2.0+ is ready + ).foreach { df => + val dlModel = classifier.fit(df) + val result = dlModel.transform(df).collect() + val accuracy = result.count(v => v.get(1) == v.get(2)).toDouble / smallData.size + accuracy should be > math.max(smallData.count(_._2 == 1), + smallData.count(_._2 == 2)).toDouble / smallData.size + } + } + + "An DLClassifier" should "fit with adam and LBFGS" in { + val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) + val criterion = ClassNLLCriterion[Float]() + Seq(new LBFGS[Float], new Adam[Float]).foreach { optimMethod => + val classifier = new DLClassifier[Float](model, criterion, Array(6)) + .setBatchSize(nRecords) + .setMaxEpoch(2) + .setOptimMethod(optimMethod) + .setLearningRate(0.1) + val data = sc.parallelize(smallData) + val df = sqlContext.createDataFrame(data).toDF("features", "label") + val dlModel = classifier.fit(df) + dlModel.isInstanceOf[DLClassifierModel[_]] should be(true) + } + } + + "An DLClassifier" should "supports validation data and summary" in { + val data = sc.parallelize(smallData) + val df = sqlContext.createDataFrame(data).toDF("features", "label") + + val logdir = com.google.common.io.Files.createTempDir() + val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) + val criterion = ClassNLLCriterion[Float]() + val classifier = new DLClassifier[Float](model, criterion, Array(6)) + .setBatchSize(nRecords) + .setEndWhen(Trigger.maxIteration(5)) + .setOptimMethod(new Adam[Float]) + .setLearningRate(0.1) + .setValidation(Trigger.severalIteration(1), df, Array(new Loss[Float]()), 2) + .setValidationSummary(ValidationSummary(logdir.getPath, "DLEstimatorValidation")) + + classifier.fit(df) + val validationSummary = classifier.getValidationSummary.get + val losses = validationSummary.readScalar("Loss") + validationSummary.close() + logdir.deleteOnExit() + } + + "An DLClassifier" should "get the same classification result with BigDL model" in { + Logger.getLogger("org").setLevel(Level.WARN) + Logger.getLogger("akka").setLevel(Level.WARN) + + val model = LeNet5(10) + + // init + val valTrans = new DLClassifierModel[Float](model, Array(28, 28)) + .setBatchSize(4) + + val tensorBuffer = new ArrayBuffer[Data]() + // generate test data with BigDL + val input = Tensor[Float](10, 28, 28).apply1(e => Random.nextFloat()) + val target = model.forward(input).toTensor[Float] + + // test against DLClassifierModel + val inputArr = input.storage().array() + val targetArr = target.max(2)._2.squeeze().storage().array() + (0 until 10).foreach(i => + tensorBuffer.append( + Data(targetArr(i), inputArr.slice(i * 28 * 28, (i + 1) * 28 * 28).map(_.toDouble)))) + val rowRDD = sc.parallelize(tensorBuffer) + val testData = sqlContext.createDataFrame(rowRDD) + assert(valTrans.transform(testData).where("prediction=label").count() == testData.count()) + tensorBuffer.clear() + } + + "An DLClassifier" should "works in ML pipeline" in { + var appSparkVersion = org.apache.spark.SPARK_VERSION + if (appSparkVersion.trim.startsWith("1")) { + val data = sc.parallelize( + smallData.map(p => (org.apache.spark.mllib.linalg.Vectors.dense(p._1), p._2))) + val df: DataFrame = sqlContext.createDataFrame(data).toDF("features", "label") + + val scaler = new MinMaxScaler().setInputCol("features").setOutputCol("scaled") + .setMax(1).setMin(-1) + val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) + val criterion = ClassNLLCriterion[Float]() + val estimator = new DLClassifier[Float](model, criterion, Array(6)) + .setBatchSize(nRecords) + .setOptimMethod(new LBFGS[Float]()) + .setLearningRate(0.1) + .setMaxEpoch(maxEpoch) + .setFeaturesCol("scaled") + val pipeline = new Pipeline().setStages(Array(scaler, estimator)) + + val pipelineModel = pipeline.fit(df) + pipelineModel.isInstanceOf[PipelineModel] should be(true) + assert(pipelineModel.transform(df).where("prediction=label").count() > nRecords * 0.8) + } + } +} + +private case class Data(label: Double, features: Array[Double]) diff --git a/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/dlframes/DLEstimatorSpec.scala b/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/dlframes/DLEstimatorSpec.scala new file mode 100644 index 00000000000..2126f556c14 --- /dev/null +++ b/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/dlframes/DLEstimatorSpec.scala @@ -0,0 +1,348 @@ +/* + * Copyright 2016 The BigDL Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.analytics.bigdl.dlframes + +import com.intel.analytics.bigdl.nn._ +import com.intel.analytics.bigdl.optim.{LBFGS, Loss, Trigger} +import com.intel.analytics.bigdl.tensor.Tensor +import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric.NumericFloat +import com.intel.analytics.bigdl.utils.Engine +import com.intel.analytics.bigdl.utils.RandomGenerator.RNG +import com.intel.analytics.bigdl.visualization.{TrainSummary, ValidationSummary} +import org.apache.spark.SparkContext +import org.apache.spark.ml.feature.MinMaxScaler +import org.apache.spark.ml.{Pipeline, PipelineModel} +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers} + +import scala.util.Random + +class DLEstimatorSpec extends FlatSpec with Matchers with BeforeAndAfter { + val model = new Sequential[Float]() + var sc : SparkContext = _ + var sqlContext : SQLContext = _ + var smallData: Seq[(Array[Double], Double)] = _ + val nRecords = 100 + val maxEpoch = 20 + + before { + Random.setSeed(42) + RNG.setSeed(42) + val conf = Engine.createSparkConf().setAppName("Test DLEstimator").setMaster("local[1]") + sc = SparkContext.getOrCreate(conf) + sqlContext = new SQLContext(sc) + smallData = DLEstimatorSpec.generateTestInput( + nRecords, Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0), -1.0, 42L) + Engine.init + } + + after{ + if (sc != null) { + sc.stop() + } + } + + "An DLEstimator" should "has correct default params" in { + val model = Linear[Float](10, 1) + val criterion = ClassNLLCriterion[Float]() + val estimator = new DLEstimator[Float](model, criterion, Array(10), Array(1)) + assert(estimator.getFeaturesCol == "features") + assert(estimator.getLabelCol == "label") + assert(estimator.getMaxEpoch == 50) + assert(estimator.getBatchSize == 1) + assert(estimator.getLearningRate == 1e-3) + assert(estimator.getLearningRateDecay == 0) + + } + + "An DLEstimator" should "get reasonable accuracy" in { + val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) + val criterion = ClassNLLCriterion[Float]() + val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) + .setBatchSize(nRecords) + .setOptimMethod(new LBFGS[Float]()) + .setLearningRate(0.1) + .setMaxEpoch(maxEpoch) + val data = sc.parallelize(smallData) + val df = sqlContext.createDataFrame(data).toDF("features", "label") + + val dlModel = estimator.fit(df) + dlModel.isInstanceOf[DLModel[_]] should be(true) + val correct = dlModel.transform(df).select("label", "prediction").rdd.filter { + case Row(label: Double, prediction: Seq[_]) => + label == prediction.indexOf(prediction.asInstanceOf[Seq[Double]].max) + 1 + }.count() + assert(correct > nRecords * 0.8) + } + + "An DLEstimator" should "support different FEATURE types" in { + val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) + val criterion = ClassNLLCriterion[Float]() + val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) + .setBatchSize(2) + // intentionally set low since this only validates data format compatibility + .setEndWhen(Trigger.maxIteration(1)) + + Array( + sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1, p._2)))) + .toDF("features", "label"), // Array[Double] + sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1.map(_.toFloat), p._2)))) + .toDF("features", "label"), // Array[Float] + sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (Vectors.dense(p._1), p._2)))) + .toDF("features", "label") // MLlib Vector + // TODO: add ML Vector when ut for Spark 2.0+ is ready + ).foreach { df => + val dlModel = estimator.fit(df) + dlModel.transform(df).collect() + } + } + + "An DLEstimator" should "support scalar FEATURE types" in { + val model = new Sequential().add(Linear[Float](1, 2)).add(LogSoftMax[Float]) + val criterion = ClassNLLCriterion[Float]() + val estimator = new DLEstimator[Float](model, criterion, Array(1), Array(1)) + .setBatchSize(2) + // intentionally set low since this only validates data format compatibility + .setEndWhen(Trigger.maxIteration(1)) + + Array( + sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1.head.toFloat, p._2)))) + .toDF("features", "label"), // Float + sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1.head, p._2)))) + .toDF("features", "label") // Double + // TODO: add ML Vector when ut for Spark 2.0+ is ready + ).foreach { df => + val dlModel = estimator.fit(df) + dlModel.transform(df).collect() + } + } + + "An DLEstimator" should "support different LABEL types" in { + val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) + val criterion = MultiLabelSoftMarginCriterion[Float]() + val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(2)) + // intentionally set low since this only validates data format compatibitliy + .setEndWhen(Trigger.maxIteration(1)) + .setBatchSize(2) + + Array( + sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1, Array(p._2, p._2))))) + .toDF("features", "label"), // Array[Double] + sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1, + Array(p._2.toFloat, p._2.toFloat))))).toDF("features", "label"), // Array[Float] + sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1, + Vectors.dense(p._2, p._2))))).toDF("features", "label") // MLlib Vector + // TODO: add ML Vector when ut for Spark 2.0+ is ready + ).foreach { df => + val dlModel = estimator.fit(df) + dlModel.transform(df).collect() + } + } + + "An DLEstimator" should "support scalar LABEL types" in { + val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) + val criterion = ClassNLLCriterion[Float]() + val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) + // intentionally set low since this only validates data format compatibitliy + .setEndWhen(Trigger.maxIteration(1)) + .setBatchSize(2) + + Array( + sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1, p._2.toFloat)))) + .toDF("features", "label"), // Float + sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1, p._2)))) + .toDF("features", "label") // Double + // TODO: add ML Vector when ut for Spark 2.0+ is ready + ).foreach { df => + val dlModel = estimator.fit(df) + dlModel.transform(df).collect() + } + } + + "An DLEstimator" should "work with tensor data" in { + + val model = Linear[Float](10, 1) + val criterion = ClassNLLCriterion[Float]() + val estimator = new DLEstimator[Float](model, criterion, Array(10), Array(1)) + .setMaxEpoch(1) + .setBatchSize(nRecords) + + val featureData = Array.tabulate(100)(_ => Tensor(10)) + val labelData = Array.tabulate(100)(_ => Tensor(1).fill(1.0f)) + val miniBatch = sc.parallelize( + featureData.zip(labelData).map(v => + MinibatchData(v._1.storage.array, v._2.storage.array)) + ) + val trainingDF: DataFrame = sqlContext.createDataFrame(miniBatch).toDF("features", "label") + + val dlModel = estimator.fit(trainingDF) + dlModel.transform(trainingDF).collect() + } + + "An DLEstimator" should "support different batchSize" in { + val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) + val criterion = ClassNLLCriterion[Float]() + val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) + .setBatchSize(51) + .setMaxEpoch(maxEpoch) + val data = sc.parallelize(smallData) + val df: DataFrame = sqlContext.createDataFrame(data).toDF("features", "label") + + val dlModel = estimator.fit(df) + dlModel.isInstanceOf[DLModel[_]] should be(true) + dlModel.transform(df).count() + } + + "An DLModel" should "support transform with different batchSize" in { + val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) + val criterion = ClassNLLCriterion[Float]() + val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) + .setBatchSize(nRecords) + .setMaxEpoch(maxEpoch) + val data = sc.parallelize(smallData) + val df: DataFrame = sqlContext.createDataFrame(data).toDF("features", "label") + val dlModel = estimator.fit(df) + assert(df.count() == dlModel.setBatchSize(51).transform(df).count()) + } + + "An DLEstimator" should "throws exception without correct inputs" in { + val model = Linear[Float](10, 1) + val criterion = ClassNLLCriterion[Float]() + val inputs = Array[String]("Feature data", "Label data") + var estimator = new DLEstimator[Float](model, criterion, Array(10), Array(2, 1)). + setFeaturesCol(inputs(0)).setLabelCol(inputs(1)) + + val featureData = Tensor(2, 10) + val labelData = Tensor(2, 1) + val miniBatch = sc.parallelize(Seq( + MinibatchData[Float](featureData.storage().array(), labelData.storage().array()) + )) + var df: DataFrame = sqlContext.createDataFrame(miniBatch).toDF(inputs: _*) + // Spark 1.6 and 2.0 throws different exception here + intercept[Exception] { + estimator.fit(df) + } + } + + "An DLEstimator" should "supports training summary" in { + val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) + val criterion = ClassNLLCriterion[Float]() + val logdir = com.google.common.io.Files.createTempDir() + val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) + .setBatchSize(nRecords) + .setMaxEpoch(5) + .setTrainSummary(TrainSummary(logdir.getPath, "DLEstimatorTrain")) + val data = sc.parallelize(smallData) + val df = sqlContext.createDataFrame(data).toDF("features", "label") + + val dlModel = estimator.fit(df) + val trainSummary = estimator.getTrainSummary.get + val losses = trainSummary.readScalar("Loss") + assert(losses.length == 5) + trainSummary.close() + logdir.deleteOnExit() + } + + "An DLEstimator" should "supports validation data and summary" in { + val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) + val criterion = ClassNLLCriterion[Float]() + val logdir = com.google.common.io.Files.createTempDir() + val data = sc.parallelize(smallData) + val df = sqlContext.createDataFrame(data).toDF("features", "label") + val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) + .setBatchSize(4) + .setEndWhen(Trigger.maxIteration(5)) + .setValidation(Trigger.severalIteration(1), df, Array(new Loss[Float]()), 2) + .setValidationSummary(ValidationSummary(logdir.getPath, "DLEstimatorValidation")) + + val dlModel = estimator.fit(df) + val validationSummary = estimator.getValidationSummary.get + val losses = validationSummary.readScalar("Loss") + assert(losses.length == 5) + validationSummary.close() + logdir.deleteOnExit() + } + + "An DLEstimator" should "throws exception when EndWhen and MaxEpoch are set" in { + val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) + val criterion = ClassNLLCriterion[Float]() + val logdir = com.google.common.io.Files.createTempDir() + + val data = sc.parallelize(smallData) + val df = sqlContext.createDataFrame(data).toDF("features", "label") + val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) + .setBatchSize(4) + .setEndWhen(Trigger.maxIteration(5)) + .setMaxEpoch(5) + + intercept[Exception] { + estimator.fit(df) + } + } + + "An DLEstimator" should "works in ML pipeline" in { + var appSparkVersion = org.apache.spark.SPARK_VERSION + if (appSparkVersion.trim.startsWith("1")) { + val data = sc.parallelize( + smallData.map(p => (org.apache.spark.mllib.linalg.Vectors.dense(p._1), p._2))) + val df: DataFrame = sqlContext.createDataFrame(data).toDF("features", "label") + + val scaler = new MinMaxScaler().setInputCol("features").setOutputCol("scaled") + .setMax(1).setMin(-1) + val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) + val criterion = ClassNLLCriterion[Float]() + val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) + .setOptimMethod(new LBFGS[Float]()) + .setLearningRate(0.1) + .setBatchSize(nRecords) + .setMaxEpoch(maxEpoch) + .setFeaturesCol("scaled") + val pipeline = new Pipeline().setStages(Array(scaler, estimator)) + + val pipelineModel = pipeline.fit(df) + pipelineModel.isInstanceOf[PipelineModel] should be(true) + val correct = pipelineModel.transform(df).select("label", "prediction").rdd.filter { + case Row(label: Double, prediction: Seq[_]) => + label == prediction.indexOf(prediction.asInstanceOf[Seq[Double]].max) + 1 + }.count() + assert(correct > nRecords * 0.8) + } + } +} + +private case class MinibatchData[T](featureData : Array[T], labelData : Array[T]) + +object DLEstimatorSpec { + // Generate noisy input of the form Y = signum(x.dot(weights) + intercept + noise) + def generateTestInput( + numRecords: Int, + weight: Array[Double], + intercept: Double, + seed: Long): Seq[(Array[Double], Double)] = { + val rnd = new Random(seed) + val data = (1 to numRecords) + .map( i => Array.tabulate(weight.length)(index => rnd.nextDouble() * 2 - 1)) + .map { record => + val y = record.zip(weight).map(t => t._1 * t._2).sum + +intercept + 0.01 * rnd.nextGaussian() + val label = if (y > 0) 2.0 else 1.0 + (record, label) + } + data + } +} diff --git a/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/dlframes/DLImageReaderSpec.scala b/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/dlframes/DLImageReaderSpec.scala new file mode 100644 index 00000000000..e6ba6d1e98e --- /dev/null +++ b/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/dlframes/DLImageReaderSpec.scala @@ -0,0 +1,118 @@ +/* + * Copyright 2016 The BigDL Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.intel.analytics.bigdl.dlframes + +import com.intel.analytics.bigdl.transform.vision.image.{ImageFrame, MatToTensor} +import com.intel.analytics.bigdl.transform.vision.image.augmentation.Resize +import com.intel.analytics.bigdl.utils.Engine +import com.intel.analytics.bigdl.utils.RandomGenerator.RNG +import org.apache.spark.SparkContext +import org.apache.spark.sql.{Row, SQLContext} +import org.opencv.core.CvType +import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers} + +import scala.util.Random + +class DLImageReaderSpec extends FlatSpec with Matchers with BeforeAndAfter { + + var sc : SparkContext = _ + var sQLContext: SQLContext = _ + val pascalResource = getClass.getClassLoader.getResource("pascal/") + private val imageNetResource = getClass.getClassLoader.getResource("imagenet/") + + before { + val conf = Engine.createSparkConf().setAppName("Test DLImageReader").setMaster("local[1]") + sc = SparkContext.getOrCreate(conf) + sQLContext = new SQLContext(sc) + + Random.setSeed(42) + RNG.setSeed(42) + + Engine.init + } + + after{ + if (sc != null) { + sc.stop() + } + } + + "DLImageReader" should "has correct result for pascal" in { + val imageDF = DLImageReader.readImages(pascalResource.getFile, sc) + assert(imageDF.count() == 1) + val r = imageDF.head().getAs[Row](0) + assert(r.getString(0).endsWith("000025.jpg")) + assert(r.getInt(1) == 375) + assert(r.getInt(2) == 500) + assert(r.getInt(3) == 3) + assert(r.getInt(4) == CvType.CV_8UC3) + assert(r.getAs[Array[Byte]](5).length == 95959) + } + + "DLImageReader" should "has correct result for imageNet" in { + val imageDirectory = imageNetResource + "n02110063/" + val imageDF = DLImageReader.readImages(imageDirectory, sc) + assert(imageDF.count() == 3) + val expectedRows = Seq( + (imageDirectory + "n02110063_8651.JPEG", 99, 129, 3, CvType.CV_8UC3), + (imageDirectory + "n02110063_11239.JPEG", 333, 500, 3, CvType.CV_8UC3), + (imageDirectory + "n02110063_15462.JPEG", 332, 500, 3, CvType.CV_8UC3) + ) + val actualRows = imageDF.rdd.collect().map(r => r.getAs[Row](0)).map { r => + (r.getString(0), r.getInt(1), r.getInt(2), r.getInt(3), r.getInt(4)) + } + assert (expectedRows.toSet == actualRows.toSet) + } + + "DLImageReader" should "has correct result for imageNet with channel 1 and 4" in { + val imageDirectory = imageNetResource + "n99999999/" + val imageDF = DLImageReader.readImages(imageDirectory, sc) + assert(imageDF.count() == 3) + val expectedRows = Seq( + (imageDirectory + "n02105855_2933.JPEG", 189, 213, 4, CvType.CV_8UC4), + (imageDirectory + "n02105855_test1.bmp", 527, 556, 1, CvType.CV_8UC1), + (imageDirectory + "n03000134_4970.JPEG", 480, 640, 3, CvType.CV_8UC3) + ) + val actualRows = imageDF.rdd.collect().map(r => r.getAs[Row](0)).map { r => + (r.getString(0), r.getInt(1), r.getInt(2), r.getInt(3), r.getInt(4)) + } + assert (expectedRows.toSet == actualRows.toSet) + } + + "DLImageReader" should "read recursively by wildcard path" in { + val imageDF = DLImageReader.readImages(imageNetResource.getFile + "*", sc) + assert(imageDF.count() == 11) + } + + "DLImageReader" should "read from multiple path" in { + val imageDirectory1 = imageNetResource + "n02110063/" + val imageDirectory2 = imageNetResource + "n99999999/" + val imageDF = DLImageReader.readImages(imageDirectory1 + "," + imageDirectory2, sc) + assert(imageDF.count() == 6) + } + + "read gray scale image" should "work" in { + val resource = getClass().getClassLoader().getResource("gray/gray.bmp") + val df = DLImageReader.readImages(resource.getFile, sc) + assert(df.count() == 1) + val r = df.head().getAs[Row](0) + assert(r.getString(0).endsWith("gray.bmp")) + assert(r.getInt(1) == 50) + assert(r.getInt(2) == 50) + assert(r.getInt(3) == 1) + assert(r.getInt(4) == CvType.CV_8UC1) + } +} diff --git a/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/dlframes/DLImageTransformerSpec.scala b/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/dlframes/DLImageTransformerSpec.scala new file mode 100644 index 00000000000..4c2fd5cdcfd --- /dev/null +++ b/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/dlframes/DLImageTransformerSpec.scala @@ -0,0 +1,138 @@ +/* + * Copyright 2016 The BigDL Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.analytics.bigdl.dlframes + +import com.intel.analytics.bigdl.transform.vision.image.{ImageFrame, ImageFrameToSample, MatToTensor} +import com.intel.analytics.bigdl.transform.vision.image.augmentation.{CenterCrop, ChannelNormalize, Resize} +import com.intel.analytics.bigdl.utils.Engine +import com.intel.analytics.bigdl.utils.RandomGenerator.RNG +import org.apache.spark.SparkContext +import org.apache.spark.sql.{Row, SQLContext} +import org.opencv.core.CvType +import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers} +import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric.NumericFloat + +import scala.util.Random + +class DLImageTransformerSpec extends FlatSpec with Matchers with BeforeAndAfter { + private var sc : SparkContext = _ + private var sqlContext : SQLContext = _ + private val pascalResource = getClass.getClassLoader.getResource("pascal/") + private val imageNetResource = getClass.getClassLoader.getResource("imagenet/") + + before { + val conf = Engine.createSparkConf().setAppName("Test DLImageTransfomer").setMaster("local[1]") + sc = SparkContext.getOrCreate(conf) + sqlContext = new SQLContext(sc) + Random.setSeed(42) + RNG.setSeed(42) + Engine.init + } + + after{ + if (sc != null) { + sc.stop() + } + } + + "DLTransformer" should "setters work" in { + val transformer = Resize(256, 256) -> CenterCrop(224, 224) -> + ChannelNormalize(123, 117, 104, 1, 1, 1) -> MatToTensor() + val trans = new DLImageTransformer(transformer) + .setInputCol("image1") + .setOutputCol("features1") + assert(trans.getInputCol == "image1") + assert(trans.getOutputCol == "features1") + } + + "DLTransformer" should "has correct result with pascal images" in { + val imageDF = DLImageReader.readImages(pascalResource.getFile, sc) + assert(imageDF.count() == 1) + val transformer = Resize(256, 256) -> CenterCrop(224, 224) -> + ChannelNormalize(123, 117, 104, 1, 1, 1) -> MatToTensor() + val transformedDF = new DLImageTransformer(transformer) + .setInputCol("image") + .setOutputCol("features") + .transform(imageDF) + val r = transformedDF.select("features").rdd.first().getAs[Row](0) + assert(r.getString(0).endsWith("pascal/000025.jpg")) + assert(r.getInt(1) == 224) + assert(r.getInt(2) == 224) + assert(r.getInt(3) == 3) + assert(r.getInt(4) == CvType.CV_32FC3) + assert(r.getSeq[Float](5).take(6).toArray.deep == Array(-30, -50, -69, -84, -46, -25).deep) + } + + "DLTransformer" should "has correct result without MatToTensor" in { + val imageDF = DLImageReader.readImages(pascalResource.getFile, sc) + assert(imageDF.count() == 1) + val transformer = Resize(256, 256) -> CenterCrop(224, 224) -> + ChannelNormalize(123, 117, 104, 1, 1, 1) + val transformedDF = new DLImageTransformer(transformer) + .setInputCol("image") + .setOutputCol("features") + .transform(imageDF) + val r = transformedDF.select("features").rdd.first().getAs[Row](0) + assert(r.getString(0).endsWith("pascal/000025.jpg")) + assert(r.getInt(1) == 224) + assert(r.getInt(2) == 224) + assert(r.getInt(3) == 3) + assert(r.getInt(4) == CvType.CV_32FC3) + assert(r.getSeq[Float](5).take(6).toArray.deep == Array(-30, -50, -69, -84, -46, -25).deep) + } + + "DLTransformer" should "ensure imf2Row and Row2Imf reversible" in { + val imageDF = DLImageReader.readImages(pascalResource.getFile, sc) + assert(imageDF.count() == 1) + val transformer = Resize(256, 256) -> CenterCrop(224, 224) -> + ChannelNormalize(123, 117, 104, 1, 1, 1) -> MatToTensor() + val transformedDF = new DLImageTransformer(transformer) + .setInputCol("image") + .setOutputCol("features") + .transform(imageDF) + val r = transformedDF.select("features").rdd.first().getAs[Row](0) + val convertedR = DLImageSchema.imf2Row(DLImageSchema.row2IMF(r)) + + assert(r.getSeq[Float](5).toArray.deep == convertedR.getAs[Array[Float]](5).deep) + } + + "DLTransformer" should "transform gray scale image" in { + val resource = getClass().getClassLoader().getResource("gray/gray.bmp") + val df = DLImageReader.readImages(resource.getFile, sc) + val dlTransformer = new DLImageTransformer(Resize(28, 28) -> MatToTensor[Float]()) + .setInputCol("image") + .setOutputCol("features") + val r = dlTransformer.transform(df).select("features").rdd.first().getAs[Row](0) + assert(r.getString(0).endsWith("gray.bmp")) + assert(r.getInt(1) == 28) + assert(r.getInt(2) == 28) + assert(r.getInt(3) == 1) + assert(r.getInt(4) == CvType.CV_32FC1) + } + + "DLTransformer" should "report error with same input and output columns" in { + val resource = getClass().getClassLoader().getResource("gray/gray.bmp") + val df = DLImageReader.readImages(resource.getFile, sc) + val dlTransformer = new DLImageTransformer(Resize(28, 28) -> MatToTensor[Float]()) + .setInputCol("image") + .setOutputCol("image") + intercept[IllegalArgumentException] { + val transformed = dlTransformer.transform(df) + } + } + +}