From d9758fba72bae80c5bb1564d85fd61d4794ed940 Mon Sep 17 00:00:00 2001 From: Le-Zheng <30695225+Le-Zheng@users.noreply.github.com> Date: Sun, 28 Jun 2020 16:32:16 +0800 Subject: [PATCH] fix throughput (#3017) * fix throughput * update --- .../bigdl/dllib/optim/DistriOptimizerV2.scala | 29 ++++++++++++------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/optim/DistriOptimizerV2.scala b/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/optim/DistriOptimizerV2.scala index a9ea527c3e8..68c774d9a3a 100644 --- a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/optim/DistriOptimizerV2.scala +++ b/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/optim/DistriOptimizerV2.scala @@ -35,6 +35,8 @@ import org.apache.spark.rdd.RDD import org.apache.spark.{SparkContext, TaskContext} import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.Future import scala.reflect.ClassTag object DistriOptimizerV2 extends AbstractOptimizer { @@ -176,7 +178,9 @@ object DistriOptimizerV2 extends AbstractOptimizer { Run the forwards/backwards pass using multiple threads in each partition, and track the number of model updates that finished before the thread timeout mechanism. */ - val training = dataRDD.zipPartitions(models, preservesPartitioning = true) { (data, iter) => + trainingTrace.traceIteration({ + val successModels = dataRDD.zipPartitions(models, preservesPartitioning = true) { + (data, iter) => val cached = iter.next() /* Note: All models in `cached` share the same storage for weights, so we only need to @@ -202,11 +206,10 @@ object DistriOptimizerV2 extends AbstractOptimizer { recordsNum += results.records Iterator.single(results.successed) - } - - val successModels = trainingTrace.traceIteration(training.reduce(_ + _)) + }.reduce(_ + _) - parameterSync(lossSum.value, successModels, cacheOfMaster, models, context) + parameterSync(lossSum.value, successModels, cacheOfMaster, models, context) + }) driverStatesUpdate(cacheOfMaster, recordsNum.value, context, trainingTrace, metrics) @@ -384,6 +387,7 @@ object DistriOptimizerV2 extends AbstractOptimizer { context: TrainingContext[T], metrics: Metrics)(implicit ev: TensorNumeric[T]): TrainingResults = { val stackSize = data.head.size() + var tasks: ArrayBuffer[Future[_]] = new ArrayBuffer() // ======================Start train models=================================== val modelsResult = TrainingTrace.time ( @@ -417,9 +421,13 @@ object DistriOptimizerV2 extends AbstractOptimizer { metrics )(Array(PUT_GRADIENT)) - (0 until context.subModelNumber).foreach { i => - cached.localModels(i).training() - cached.localModels(i).zeroGradParameters() + tasks ++= Engine.default.invoke { + (0 until context.subModelNumber).map { i => + () => { + cached.localModels(i).training() + cached.localModels(i).zeroGradParameters() + } + } } TrainingResults(modelsResult.size, lossSum, modelsResult.size * stackSize) @@ -446,7 +454,8 @@ object DistriOptimizerV2 extends AbstractOptimizer { cacheOfMaster: MasterCache[T], recordsNum: Int, context: TrainingContext[T], - trainingTrace: TrainingTrace, metrics: Metrics)( + trainingTrace: TrainingTrace, + metrics: Metrics)( implicit ev: TensorNumeric[T]): Unit = { val optimMethods = cacheOfMaster.optimMethods val updateScore = cacheOfMaster.validationMethods.isDefined @@ -462,7 +471,7 @@ object DistriOptimizerV2 extends AbstractOptimizer { val _header = header(trainingTrace.epochs, records, context.numSamples, trainingTrace.iterations, trainingTakes) val loss = context.state[Float](StateEntry.LOSS) - logger.info(s"${_header} Trained $recordsNum records in $iterationTakes seconds. " + + logger.info(s"${_header} Trained $recordsNum records in ${(iterationTakes) /1e9f} seconds. " + s"Throughput is $throughput records/second. " + s"Loss is $loss. " + s"${getHyperParameterLog(optimMethods)}")