Skip to content

Commit

Permalink
[SPARK-32140][ML][PYSPARK] Add training summary to FMClassificationModel
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Add training summary for FMClassificationModel...
### Why are the changes needed?
so that user can get the training process status, such as loss value of each iteration and total iteration number.

### Does this PR introduce _any_ user-facing change?
Yes
FMClassificationModel.summary
FMClassificationModel.evaluate

### How was this patch tested?
new tests

Closes #28960 from huaxingao/fm_summary.

Authored-by: Huaxin Gao <[email protected]>
Signed-off-by: Huaxin Gao <[email protected]>
  • Loading branch information
huaxingao committed Jul 15, 2020
1 parent cf22d94 commit b05f309
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.linalg.{Vector => OldVector}
import org.apache.spark.mllib.linalg.VectorImplicits._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql._
import org.apache.spark.storage.StorageLevel

/**
Expand Down Expand Up @@ -212,14 +212,34 @@ class FMClassifier @Since("3.0.0") (

if (handlePersistence) data.persist(StorageLevel.MEMORY_AND_DISK)

val coefficients = trainImpl(data, numFeatures, LogisticLoss)
val (coefficients, objectiveHistory) = trainImpl(data, numFeatures, LogisticLoss)

val (intercept, linear, factors) = splitCoefficients(
coefficients, numFeatures, $(factorSize), $(fitIntercept), $(fitLinear))

if (handlePersistence) data.unpersist()

copyValues(new FMClassificationModel(uid, intercept, linear, factors))
createModel(dataset, intercept, linear, factors, objectiveHistory)
}

private def createModel(
dataset: Dataset[_],
intercept: Double,
linear: Vector,
factors: Matrix,
objectiveHistory: Array[Double]): FMClassificationModel = {
val model = copyValues(new FMClassificationModel(uid, intercept, linear, factors))
val weightColName = if (!isDefined(weightCol)) "weightCol" else $(weightCol)

val (summaryModel, probabilityColName, predictionColName) = model.findSummaryModel()
val summary = new FMClassificationTrainingSummaryImpl(
summaryModel.transform(dataset),
probabilityColName,
predictionColName,
$(labelCol),
weightColName,
objectiveHistory)
model.setSummary(Some(summary))
}

@Since("3.0.0")
Expand All @@ -243,14 +263,36 @@ class FMClassificationModel private[classification] (
@Since("3.0.0") val linear: Vector,
@Since("3.0.0") val factors: Matrix)
extends ProbabilisticClassificationModel[Vector, FMClassificationModel]
with FMClassifierParams with MLWritable {
with FMClassifierParams with MLWritable
with HasTrainingSummary[FMClassificationTrainingSummary]{

@Since("3.0.0")
override val numClasses: Int = 2

@Since("3.0.0")
override val numFeatures: Int = linear.size

/**
* Gets summary of model on training set. An exception is thrown
* if `hasSummary` is false.
*/
@Since("3.1.0")
override def summary: FMClassificationTrainingSummary = super.summary

/**
* Evaluates the model on a test dataset.
*
* @param dataset Test dataset to evaluate model on.
*/
@Since("3.1.0")
def evaluate(dataset: Dataset[_]): FMClassificationSummary = {
val weightColName = if (!isDefined(weightCol)) "weightCol" else $(weightCol)
// Handle possible missing or invalid probability or prediction columns
val (summaryModel, probability, predictionColName) = findSummaryModel()
new FMClassificationSummaryImpl(summaryModel.transform(dataset),
probability, predictionColName, $(labelCol), weightColName)
}

@Since("3.0.0")
override def predictRaw(features: Vector): Vector = {
val rawPrediction = getRawPrediction(features, intercept, linear, factors)
Expand Down Expand Up @@ -328,3 +370,53 @@ object FMClassificationModel extends MLReadable[FMClassificationModel] {
}
}
}

/**
* Abstraction for FMClassifier results for a given model.
*/
sealed trait FMClassificationSummary extends BinaryClassificationSummary

/**
* Abstraction for FMClassifier training results.
*/
sealed trait FMClassificationTrainingSummary extends FMClassificationSummary with TrainingSummary

/**
* FMClassifier results for a given model.
*
* @param predictions dataframe output by the model's `transform` method.
* @param scoreCol field in "predictions" which gives the probability of each instance.
* @param predictionCol field in "predictions" which gives the prediction for a data instance as a
* double.
* @param labelCol field in "predictions" which gives the true label of each instance.
* @param weightCol field in "predictions" which gives the weight of each instance.
*/
private class FMClassificationSummaryImpl(
@transient override val predictions: DataFrame,
override val scoreCol: String,
override val predictionCol: String,
override val labelCol: String,
override val weightCol: String)
extends FMClassificationSummary

/**
* FMClassifier training results.
*
* @param predictions dataframe output by the model's `transform` method.
* @param scoreCol field in "predictions" which gives the probability of each instance.
* @param predictionCol field in "predictions" which gives the prediction for a data instance as a
* double.
* @param labelCol field in "predictions" which gives the true label of each instance.
* @param weightCol field in "predictions" which gives the weight of each instance.
* @param objectiveHistory objective function (scaled loss + regularization) at each iteration.
*/
private class FMClassificationTrainingSummaryImpl(
predictions: DataFrame,
scoreCol: String,
predictionCol: String,
labelCol: String,
weightCol: String,
override val objectiveHistory: Array[Double])
extends FMClassificationSummaryImpl(
predictions, scoreCol, predictionCol, labelCol, weightCol)
with FMClassificationTrainingSummary
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import org.apache.spark.storage.StorageLevel
*/
private[ml] trait FactorizationMachinesParams extends PredictorParams
with HasMaxIter with HasStepSize with HasTol with HasSolver with HasSeed
with HasFitIntercept with HasRegParam {
with HasFitIntercept with HasRegParam with HasWeightCol {

/**
* Param for dimensionality of the factors (&gt;= 0)
Expand Down Expand Up @@ -134,7 +134,7 @@ private[ml] trait FactorizationMachines extends FactorizationMachinesParams {
data: RDD[(Double, OldVector)],
numFeatures: Int,
loss: String
): Vector = {
): (Vector, Array[Double]) = {

// initialize coefficients
val initialCoefficients = initCoefficients(numFeatures)
Expand All @@ -151,8 +151,8 @@ private[ml] trait FactorizationMachines extends FactorizationMachinesParams {
.setRegParam($(regParam))
.setMiniBatchFraction($(miniBatchFraction))
.setConvergenceTol($(tol))
val coefficients = optimizer.optimize(data, initialCoefficients)
coefficients.asML
val (coefficients, lossHistory) = optimizer.optimizeWithLossReturned(data, initialCoefficients)
(coefficients.asML, lossHistory)
}
}

Expand Down Expand Up @@ -421,7 +421,7 @@ class FMRegressor @Since("3.0.0") (

if (handlePersistence) data.persist(StorageLevel.MEMORY_AND_DISK)

val coefficients = trainImpl(data, numFeatures, SquaredError)
val (coefficients, _) = trainImpl(data, numFeatures, SquaredError)

val (intercept, linear, factors) = splitCoefficients(
coefficients, numFeatures, $(factorSize), $(fitIntercept), $(fitLinear))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,20 @@ class GradientDescent private[spark] (private var gradient: Gradient, private va
* @return solution vector
*/
def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {
val (weights, _) = GradientDescent.runMiniBatchSGD(
val (weights, _) = optimizeWithLossReturned(data, initialWeights)
weights
}

/**
* Runs gradient descent on the given training data.
* @param data training data
* @param initialWeights initial weights
* @return solution vector and loss value in an array
*/
def optimizeWithLossReturned(
data: RDD[(Double, Vector)],
initialWeights: Vector): (Vector, Array[Double]) = {
GradientDescent.runMiniBatchSGD(
data,
gradient,
updater,
Expand All @@ -139,7 +152,6 @@ class GradientDescent private[spark] (private var gradient: Gradient, private va
miniBatchFraction,
initialWeights,
convergenceTol)
weights
}

}
Expand Down Expand Up @@ -195,7 +207,7 @@ object GradientDescent extends Logging {
s"numIterations=$numIterations and miniBatchFraction=$miniBatchFraction")
}

val stochasticLossHistory = new ArrayBuffer[Double](numIterations)
val stochasticLossHistory = new ArrayBuffer[Double](numIterations + 1)
// Record previous weight and current one to calculate solution vector difference

var previousWeights: Option[Vector] = None
Expand Down Expand Up @@ -226,7 +238,7 @@ object GradientDescent extends Logging {

var converged = false // indicates whether converged based on convergenceTol
var i = 1
while (!converged && i <= numIterations) {
while (!converged && (i <= numIterations + 1)) {
val bcWeights = data.context.broadcast(weights)
// Sample a subset (fraction miniBatchFraction) of the total data
// compute and sum up the subgradients on this subset (this is one map-reduce)
Expand All @@ -249,17 +261,19 @@ object GradientDescent extends Logging {
* and regVal is the regularization value computed in the previous iteration as well.
*/
stochasticLossHistory += lossSum / miniBatchSize + regVal
val update = updater.compute(
weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble),
stepSize, i, regParam)
weights = update._1
regVal = update._2

previousWeights = currentWeights
currentWeights = Some(weights)
if (previousWeights != None && currentWeights != None) {
converged = isConverged(previousWeights.get,
currentWeights.get, convergenceTol)
if (i != (numIterations + 1)) {
val update = updater.compute(
weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble),
stepSize, i, regParam)
weights = update._1
regVal = update._2

previousWeights = currentWeights
currentWeights = Some(weights)
if (previousWeights != None && currentWeights != None) {
converged = isConverged(previousWeights.get,
currentWeights.get, convergenceTol)
}
}
} else {
logWarning(s"Iteration ($i/$numIterations). The size of sampled batch is zero")
Expand All @@ -271,7 +285,6 @@ object GradientDescent extends Logging {
stochasticLossHistory.takeRight(10).mkString(", ")))

(weights, stochasticLossHistory.toArray)

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,14 @@ class LBFGS(private var gradient: Gradient, private var updater: Updater)
}

override def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {
val (weights, _) = LBFGS.runLBFGS(
val (weights, _) = optimizeWithLossReturned(data, initialWeights)
weights
}

def optimizeWithLossReturned(
data: RDD[(Double, Vector)],
initialWeights: Vector): (Vector, Array[Double]) = {
LBFGS.runLBFGS(
data,
gradient,
updater,
Expand All @@ -145,9 +152,7 @@ class LBFGS(private var gradient: Gradient, private var updater: Updater)
maxNumIterations,
regParam,
initialWeights)
weights
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,32 @@ class FMClassifierSuite extends MLTest with DefaultReadWriteTest {
testPredictionModelSinglePrediction(fmModel, smallBinaryDataset)
}

test("summary and training summary") {
val fm = new FMClassifier()
val model = fm.setMaxIter(5).fit(smallBinaryDataset)

val summary = model.evaluate(smallBinaryDataset)

assert(model.summary.accuracy === summary.accuracy)
assert(model.summary.weightedPrecision === summary.weightedPrecision)
assert(model.summary.weightedRecall === summary.weightedRecall)
assert(model.summary.pr.collect() === summary.pr.collect())
assert(model.summary.roc.collect() === summary.roc.collect())
assert(model.summary.areaUnderROC === summary.areaUnderROC)
}

test("FMClassifier training summary totalIterations") {
Seq(1, 5, 10, 20, 100).foreach { maxIter =>
val trainer = new FMClassifier().setMaxIter(maxIter)
val model = trainer.fit(smallBinaryDataset)
if (maxIter == 1) {
assert(model.summary.totalIterations === maxIter)
} else {
assert(model.summary.totalIterations <= maxIter)
}
}
}

test("read/write") {
def checkModelData(
model: FMClassificationModel,
Expand Down
48 changes: 46 additions & 2 deletions python/pyspark/ml/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@
'NaiveBayes', 'NaiveBayesModel',
'MultilayerPerceptronClassifier', 'MultilayerPerceptronClassificationModel',
'OneVsRest', 'OneVsRestModel',
'FMClassifier', 'FMClassificationModel']
'FMClassifier', 'FMClassificationModel', 'FMClassificationSummary',
'FMClassificationTrainingSummary']


class _ClassifierParams(HasRawPredictionCol, _PredictorParams):
Expand Down Expand Up @@ -3226,7 +3227,7 @@ def setRegParam(self, value):


class FMClassificationModel(_JavaProbabilisticClassificationModel, _FactorizationMachinesParams,
JavaMLWritable, JavaMLReadable):
JavaMLWritable, JavaMLReadable, HasTrainingSummary):
"""
Model fitted by :class:`FMClassifier`.
Expand Down Expand Up @@ -3257,6 +3258,49 @@ def factors(self):
"""
return self._call_java("factors")

@since("3.1.0")
def summary(self):
"""
Gets summary (e.g. accuracy/precision/recall, objective history, total iterations) of model
trained on the training set. An exception is thrown if `trainingSummary is None`.
"""
if self.hasSummary:
return FMClassificationTrainingSummary(super(FMClassificationModel, self).summary)
else:
raise RuntimeError("No training summary available for this %s" %
self.__class__.__name__)

@since("3.1.0")
def evaluate(self, dataset):
"""
Evaluates the model on a test dataset.
:param dataset:
Test dataset to evaluate model on, where dataset is an
instance of :py:class:`pyspark.sql.DataFrame`
"""
if not isinstance(dataset, DataFrame):
raise ValueError("dataset must be a DataFrame but got %s." % type(dataset))
java_fm_summary = self._call_java("evaluate", dataset)
return FMClassificationSummary(java_fm_summary)


class FMClassificationSummary(_BinaryClassificationSummary):
"""
Abstraction for FMClassifier Results for a given model.
.. versionadded:: 3.1.0
"""
pass


@inherit_doc
class FMClassificationTrainingSummary(FMClassificationSummary, _TrainingSummary):
"""
Abstraction for FMClassifier Training results.
.. versionadded:: 3.1.0
"""
pass


if __name__ == "__main__":
import doctest
Expand Down
Loading

0 comments on commit b05f309

Please sign in to comment.