Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NNFrames: support caching training data on Disk #1588

Merged
merged 6 commits into from
Sep 20, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion docs/docs/APIGuide/PipelineAPI/nnframes.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ Each`Preprocessing` conducts a data conversion step in the preprocessing phase,
`Preprocessing` for popular data types like Image, Array or Vector are provided in package
`com.intel.analytics.zoo.feature`, while user can also develop customized `Preprocessing`.

NNEstimator and NNClassifier also supports setting the caching level for the training data.
Options are "DRAM", "PMEM" or "DISK_AND_DRAM". If DISK_AND_DRAM(numSlice) is used, only 1/numSlice
data will be loaded into memory during training time. By default, DRAM mode is used and all data
are cached in memory.

By default, `SeqToTensor` is used to convert an array or Vector to a 1-dimension Tensor.
Using the `Preprocessing` allows `NNEstimator` to cache only the raw data and decrease the
memory consumption during feature conversion and training, it also enables the model to digest
Expand Down Expand Up @@ -113,7 +118,7 @@ df = self.sqlContext.createDataFrame(data, schema)
model = Sequential().add(Linear(2, 2))
criterion = MSECriterion()
estimator = NNEstimator(model, criterion, SeqToTensor([2]), ArrayToTensor([2]))\
.setBatchSize(4).setLearningRate(0.2).setMaxEpoch(40)
.setBatchSize(4).setLearningRate(0.2).setMaxEpoch(40) \
nnModel = estimator.fit(df)
res = nnModel.transform(df)
```
Expand Down
10 changes: 10 additions & 0 deletions pyzoo/test/zoo/pipeline/nnframes/test_nn_classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,16 @@ def test_nnEstimator_fit_gradient_clipping(self):
estimator.setGradientClippingByL2Norm(1.2)
estimator.fit(df)

def test_nnEstimator_fit_with_Cache_Disk(self):
model = Sequential().add(Linear(2, 2))
criterion = MSECriterion()
estimator = NNEstimator(model, criterion, SeqToTensor([2]), ArrayToTensor([2])) \
.setBatchSize(4).setLearningRate(0.2).setMaxEpoch(2) \
.setDataCacheLevel("DISK_AND_DRAM", 4)

df = self.get_estimator_df()
estimator.fit(df)

def test_nnEstimator_fit_with_non_default_featureCol(self):
model = Sequential().add(Linear(2, 2))
criterion = MSECriterion()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
.setBatchSize(32) \
.setMaxEpoch(20) \
.setFeaturesCol("embedding") \
.setDataCacheLevel("DISK_AND_DRAM", 4) \
.setCachingSample(False) \

pipeline = Pipeline(stages=[preTrainedNNModel, classifier])
Expand Down
19 changes: 19 additions & 0 deletions pyzoo/zoo/pipeline/nnframes/nn_classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ def __init__(self, model, criterion,
self.checkpoint_config = None
self.validation_summary = None
self.endWhen = None
self.dataCacheLevel = "DRAM"

def setSamplePreprocessing(self, val):
"""
Expand Down Expand Up @@ -255,6 +256,24 @@ def getEndWhen(self):
"""
return self.endWhen

def setDataCacheLevel(self, level, numSlice=None):
"""
:param level: string, "DRAM", "PMEM" or "DISK_AND_DRAM".
If it's DRAM, will cache dataset into dynamic random-access memory
If it's PMEM, will cache dataset into Intel Optane DC Persistent Memory
If it's DISK_AND_DRAM, will cache dataset into disk, and only hold 1/numSlice
of the data into memory during the training. After going through the
1/numSlice, we will release the current cache, and load another slice into
memory.
"""
pythonBigDL_method_name = "setDataCacheLevel"
callBigDlFunc(self.bigdl_type, pythonBigDL_method_name, self.value, level, numSlice)
self.dataCacheLevel = level if numSlice is None else (level, numSlice)
return self

def getDataCacheLevel(self):
return self.dataCacheLevel

def setLearningRate(self, val):
"""
Sets the value of :py:attr:`learningRate`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.intel.analytics.zoo.feature.pmem

import scala.collection.mutable.ArrayBuffer

sealed trait MemoryType
sealed trait MemoryType extends Serializable

case object PMEM extends MemoryType

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ import com.intel.analytics.bigdl.utils.serializer.ModuleLoader
import com.intel.analytics.bigdl.utils.{File, T}
import com.intel.analytics.bigdl.visualization.{TrainSummary, ValidationSummary}
import com.intel.analytics.bigdl.{Criterion, DataSet, Module}
import com.intel.analytics.zoo.feature.FeatureSet
import com.intel.analytics.zoo.feature.common.{Preprocessing, _}
import com.intel.analytics.zoo.feature.pmem.{DRAM, MemoryType}
import com.intel.analytics.zoo.pipeline.api.Net
import com.intel.analytics.zoo.pipeline.api.keras.layers.utils.EngineRef
import com.intel.analytics.zoo.pipeline.api.keras.models.InternalDistriOptimizer
import org.apache.hadoop.fs.Path
import org.apache.log4j.Logger
import org.apache.spark.SparkContext
Expand Down Expand Up @@ -134,6 +137,20 @@ private[nnframes] trait TrainingParams[@specialized(Float, Double) T] extends Pa
* Get check point path.
*/
def getCheckpointPath: String = $(checkpointPath)

/**
* How to cache the training data, options are defined in com.intel.analytics.zoo.feature.pmem.
* If it's DRAM, will cache dataset into dynamic random-access memory
* If it's PMEM, will cache dataset into Intel Optane DC Persistent Memory
* If it's DISK_AND_DRAM(numSlice: Int), will cache dataset into disk, and only hold 1/n
* of the data into memory during the training. After going through the 1/n, we will
* release the current cache, and load another 1/n into memory.
* By default, DRAM is used.
*/
final val dataCacheLevel = new Param[MemoryType](
this, "dataCacheLevel", "cache the data in memory, disk, ")

def getDataCacheLevel: MemoryType = $(dataCacheLevel)
}

/**
Expand Down Expand Up @@ -236,6 +253,11 @@ class NNEstimator[T: ClassTag] private[zoo] (
}
setDefault(cachingSample, true)

def setDataCacheLevel(value: MemoryType): this.type = {
set(dataCacheLevel, value)
}
setDefault(dataCacheLevel, DRAM)

/**
* Clear clipping params, in this case, clipping will not be applied.
*/
Expand Down Expand Up @@ -358,7 +380,7 @@ class NNEstimator[T: ClassTag] private[zoo] (

private def getDataSet(
dataFrame: DataFrame,
batchSize: Int): DataSet[MiniBatch[T]] = {
batchSize: Int): FeatureSet[MiniBatch[T]] = {

val sp = $(samplePreprocessing).asInstanceOf[Preprocessing[(Any, Option[Any]), Sample[T]]]
val featureColIndex = dataFrame.schema.fieldIndex($(featuresCol))
Expand All @@ -378,10 +400,11 @@ class NNEstimator[T: ClassTag] private[zoo] (
val labels = labelFunc(row)
(features, labels)
}

val initialDataSet = if ($(cachingSample)) {
DataSet.rdd(sp.apply(featureAndLabel))
FeatureSet.rdd(sp.apply(featureAndLabel), memoryType = $(dataCacheLevel))
} else {
DataSet.rdd(featureAndLabel).transform(sp)
FeatureSet.rdd(featureAndLabel, memoryType = $(dataCacheLevel)).transform(sp)
}

initialDataSet.transform(SampleToMiniBatch[T](batchSize))
Expand All @@ -390,7 +413,7 @@ class NNEstimator[T: ClassTag] private[zoo] (
protected override def internalFit(dataFrame: DataFrame): NNModel[T] = {
val trainingDataSet = getDataSet(dataFrame, $(batchSize))
val endTrigger = if (isSet(endWhen)) $(endWhen) else Trigger.maxEpoch($(maxEpoch))
val optimizer = Optimizer(model, trainingDataSet, criterion)
val optimizer = new InternalDistriOptimizer(model, null, criterion)
.setOptimMethod($(optimMethod))
.setEndWhen(endTrigger)

Expand All @@ -416,11 +439,16 @@ class NNEstimator[T: ClassTag] private[zoo] (
optimizer.setConstantGradientClipping(constantClippingValues._1, constantClippingValues._2)
}

val validationFeatureset = if (validationTrigger.isDefined) {
getDataSet(validationDF, validationBatchSize)
} else {
null
}

if (validationTrigger.isDefined) {
val validationSamples = getDataSet(validationDF, validationBatchSize)
optimizer.setValidation(
validationTrigger.get,
validationSamples,
validationFeatureset,
validationMethods)
if (this.validationSummary.isDefined) {
optimizer.setValidationSummary(this.validationSummary.get)
Expand All @@ -438,8 +466,15 @@ class NNEstimator[T: ClassTag] private[zoo] (
}
}

val optimizedModel = optimizer.optimize()
wrapBigDLModel(optimizedModel)
optimizer.train(
trainingDataSet,
criterion,
Some(endTrigger),
if (isSet(this.checkpointPath)) Some($(checkpointTrigger)) else None,
validationFeatureset,
validationMethods
)
wrapBigDLModel(model)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import com.intel.analytics.bigdl.{Criterion, Module}
import com.intel.analytics.zoo.common.PythonZoo
import com.intel.analytics.zoo.feature.common._
import com.intel.analytics.zoo.feature.image.RowToImageFeature
import com.intel.analytics.zoo.feature.pmem._
import com.intel.analytics.zoo.pipeline.nnframes._
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.DataFrame
Expand Down Expand Up @@ -168,6 +169,17 @@ class PythonNNFrames[T: ClassTag](implicit ev: TensorNumeric[T]) extends PythonZ
estimator.setEndWhen(trigger)
}

def setDataCacheLevel(estimator: NNEstimator[T], level: String, numSlice: Int = 4): NNEstimator[T] = {
val memType = level.trim.toUpperCase match {
case "DRAM" => DRAM
case "PMEM" => PMEM
case "DISK_AND_DRAM" => DISK_AND_DRAM(numSlice)
case "DIRECT" => DIRECT
case _ => throw new IllegalArgumentException(s"$level is not supported.")
}
estimator.setDataCacheLevel(memType)
}

def setCheckpoint(
estimator: NNEstimator[T],
path: String,
Expand Down