Skip to content

Commit

Permalink
fix throughput (#3017)
Browse files Browse the repository at this point in the history
* fix throughput

* update
  • Loading branch information
Le-Zheng authored Jun 28, 2020
1 parent d1bea93 commit 7de9256
Showing 1 changed file with 19 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, TaskContext}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.Future
import scala.reflect.ClassTag

object DistriOptimizerV2 extends AbstractOptimizer {
Expand Down Expand Up @@ -176,7 +178,9 @@ object DistriOptimizerV2 extends AbstractOptimizer {
Run the forwards/backwards pass using multiple threads in each partition, and track the
number of model updates that finished before the thread timeout mechanism.
*/
val training = dataRDD.zipPartitions(models, preservesPartitioning = true) { (data, iter) =>
trainingTrace.traceIteration({
val successModels = dataRDD.zipPartitions(models, preservesPartitioning = true) {
(data, iter) =>
val cached = iter.next()
/*
Note: All models in `cached` share the same storage for weights, so we only need to
Expand All @@ -202,11 +206,10 @@ object DistriOptimizerV2 extends AbstractOptimizer {
recordsNum += results.records

Iterator.single(results.successed)
}

val successModels = trainingTrace.traceIteration(training.reduce(_ + _))
}.reduce(_ + _)

parameterSync(lossSum.value, successModels, cacheOfMaster, models, context)
parameterSync(lossSum.value, successModels, cacheOfMaster, models, context)
})

driverStatesUpdate(cacheOfMaster, recordsNum.value,
context, trainingTrace, metrics)
Expand Down Expand Up @@ -384,6 +387,7 @@ object DistriOptimizerV2 extends AbstractOptimizer {
context: TrainingContext[T],
metrics: Metrics)(implicit ev: TensorNumeric[T]): TrainingResults = {
val stackSize = data.head.size()
var tasks: ArrayBuffer[Future[_]] = new ArrayBuffer()

// ======================Start train models===================================
val modelsResult = TrainingTrace.time (
Expand Down Expand Up @@ -417,9 +421,13 @@ object DistriOptimizerV2 extends AbstractOptimizer {
metrics
)(Array(PUT_GRADIENT))

(0 until context.subModelNumber).foreach { i =>
cached.localModels(i).training()
cached.localModels(i).zeroGradParameters()
tasks ++= Engine.default.invoke {
(0 until context.subModelNumber).map { i =>
() => {
cached.localModels(i).training()
cached.localModels(i).zeroGradParameters()
}
}
}

TrainingResults(modelsResult.size, lossSum, modelsResult.size * stackSize)
Expand All @@ -446,7 +454,8 @@ object DistriOptimizerV2 extends AbstractOptimizer {
cacheOfMaster: MasterCache[T],
recordsNum: Int,
context: TrainingContext[T],
trainingTrace: TrainingTrace, metrics: Metrics)(
trainingTrace: TrainingTrace,
metrics: Metrics)(
implicit ev: TensorNumeric[T]): Unit = {
val optimMethods = cacheOfMaster.optimMethods
val updateScore = cacheOfMaster.validationMethods.isDefined
Expand All @@ -462,7 +471,7 @@ object DistriOptimizerV2 extends AbstractOptimizer {
val _header = header(trainingTrace.epochs, records, context.numSamples,
trainingTrace.iterations, trainingTakes)
val loss = context.state[Float](StateEntry.LOSS)
logger.info(s"${_header} Trained $recordsNum records in $iterationTakes seconds. " +
logger.info(s"${_header} Trained $recordsNum records in ${(iterationTakes) /1e9f} seconds. " +
s"Throughput is $throughput records/second. " +
s"Loss is $loss. " +
s"${getHyperParameterLog(optimMethods)}")
Expand Down

0 comments on commit 7de9256

Please sign in to comment.