From d7b2b85bfaab1a85f53bcd9b1e5005acaff66ecc Mon Sep 17 00:00:00 2001 From: tosky001 Date: Tue, 16 Jan 2018 14:09:58 +0800 Subject: [PATCH 1/2] bug fix: DLModel prediction (#4) Make sure DLModel.train=False when predicting in pipeline API --- .../org/apache/spark/ml/DLEstimator.scala | 3 ++- .../bigdl/optim/DLEstimatorSpec.scala | 21 +++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/spark/dl/src/main/scala/org/apache/spark/ml/DLEstimator.scala b/spark/dl/src/main/scala/org/apache/spark/ml/DLEstimator.scala index a0433498751..d125f98482d 100644 --- a/spark/dl/src/main/scala/org/apache/spark/ml/DLEstimator.scala +++ b/spark/dl/src/main/scala/org/apache/spark/ml/DLEstimator.scala @@ -391,7 +391,8 @@ class DLModel[@specialized(Float, Double) T: ClassTag]( val localBatchSize = $(batchSize) val resultRDD = dataFrame.rdd.mapPartitions { rowIter => - val localModel = modelBroadCast.value() + // call the evaluate method to enable DLModel.train=False during the predict process + val localModel = modelBroadCast.value().evaluate() rowIter.grouped(localBatchSize).flatMap { rowBatch => val samples = rowBatch.map { row => val features = featureFunc(row, featureColIndex) diff --git a/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/DLEstimatorSpec.scala b/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/DLEstimatorSpec.scala index 04d41917c5b..89b2d1ca4d8 100644 --- a/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/DLEstimatorSpec.scala +++ b/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/DLEstimatorSpec.scala @@ -91,6 +91,27 @@ class DLEstimatorSpec extends FlatSpec with Matchers with BeforeAndAfter { assert(correct > nRecords * 0.8) } + "An DLEstimator" should "throws exception when DLModel is predicting with DLModel.train=True" in { + val model = new Sequential().add(Linear[Float](6, 2)) + .add(Dropout[Float](initP = 0.5)) + .add(LogSoftMax[Float]) + val criterion = ClassNLLCriterion[Float]() + val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) + .setBatchSize(nRecords) + .setOptimMethod(new LBFGS[Float]()) + .setLearningRate(0.1) + .setMaxEpoch(maxEpoch) + val data = sc.parallelize(smallData) + val df = sqlContext.createDataFrame(data).toDF("features", "label") + val dlModel = estimator.fit(df) + dlModel.isInstanceOf[DLModel[_]] should be(true) + val correct = dlModel.transform(df).select("label", "prediction").rdd.filter { + case Row(label: Double, prediction: Seq[Double]) => + label == prediction.indexOf(prediction.max) + 1 + }.count() + assert(correct > nRecords * 0.8) + } + "An DLEstimator" should "support different FEATURE types" in { val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) val criterion = ClassNLLCriterion[Float]() From ded0ea89fa5ba133c933f604153175b4b48dcaad Mon Sep 17 00:00:00 2001 From: Xu Xiao Date: Tue, 30 Jan 2018 11:41:27 +0800 Subject: [PATCH 2/2] 1. broadcast transformer in DLModel.transform ; 2. remove useless ut --- .../org/apache/spark/ml/DLEstimator.scala | 9 ++++---- .../bigdl/optim/DLEstimatorSpec.scala | 21 ------------------- 2 files changed, 5 insertions(+), 25 deletions(-) diff --git a/spark/dl/src/main/scala/org/apache/spark/ml/DLEstimator.scala b/spark/dl/src/main/scala/org/apache/spark/ml/DLEstimator.scala index d125f98482d..6408d37fe8f 100644 --- a/spark/dl/src/main/scala/org/apache/spark/ml/DLEstimator.scala +++ b/spark/dl/src/main/scala/org/apache/spark/ml/DLEstimator.scala @@ -387,12 +387,13 @@ class DLModel[@specialized(Float, Double) T: ClassTag]( val featureColIndex = dataFrame.schema.fieldIndex($(featuresCol)) val featureFunc = getConvertFunc(featureType) val sc = dataFrame.sqlContext.sparkContext - val modelBroadCast = ModelBroadcast[T]().broadcast(sc, model) + val modelBroadCast = ModelBroadcast[T]().broadcast(sc, model.evaluate()) val localBatchSize = $(batchSize) + val transformerBC = sc.broadcast(SampleToMiniBatch[T](localBatchSize)) val resultRDD = dataFrame.rdd.mapPartitions { rowIter => - // call the evaluate method to enable DLModel.train=False during the predict process - val localModel = modelBroadCast.value().evaluate() + val localModel = modelBroadCast.value() + val transformer = transformerBC.value.cloneTransformer() rowIter.grouped(localBatchSize).flatMap { rowBatch => val samples = rowBatch.map { row => val features = featureFunc(row, featureColIndex) @@ -402,7 +403,7 @@ class DLModel[@specialized(Float, Double) T: ClassTag]( } Sample(Tensor(featureBuffer.toArray, featureSize)) }.toIterator - val predictions = SampleToMiniBatch(localBatchSize).apply(samples).flatMap { batch => + val predictions = transformer(samples).flatMap { batch => val batchResult = localModel.forward(batch.getInput()) batchResult.toTensor.split(1).map(outputToPrediction) } diff --git a/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/DLEstimatorSpec.scala b/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/DLEstimatorSpec.scala index 89b2d1ca4d8..04d41917c5b 100644 --- a/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/DLEstimatorSpec.scala +++ b/spark/dl/src/test/scala/com/intel/analytics/bigdl/optim/DLEstimatorSpec.scala @@ -91,27 +91,6 @@ class DLEstimatorSpec extends FlatSpec with Matchers with BeforeAndAfter { assert(correct > nRecords * 0.8) } - "An DLEstimator" should "throws exception when DLModel is predicting with DLModel.train=True" in { - val model = new Sequential().add(Linear[Float](6, 2)) - .add(Dropout[Float](initP = 0.5)) - .add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) - .setBatchSize(nRecords) - .setOptimMethod(new LBFGS[Float]()) - .setLearningRate(0.1) - .setMaxEpoch(maxEpoch) - val data = sc.parallelize(smallData) - val df = sqlContext.createDataFrame(data).toDF("features", "label") - val dlModel = estimator.fit(df) - dlModel.isInstanceOf[DLModel[_]] should be(true) - val correct = dlModel.transform(df).select("label", "prediction").rdd.filter { - case Row(label: Double, prediction: Seq[Double]) => - label == prediction.indexOf(prediction.max) + 1 - }.count() - assert(correct > nRecords * 0.8) - } - "An DLEstimator" should "support different FEATURE types" in { val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) val criterion = ClassNLLCriterion[Float]()