Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3181][MLLIB]: Add Robust Regression Algorithm with Huber Estimator #8013

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ private[shared] object SharedParamsCodeGen {
"options may be added later.",
isValid = "ParamValidators.inArray(Array(\"skip\", \"error\"))"),
ParamDesc[Boolean]("standardization", "whether to standardize the training features" +
" before fitting the model", Some("true")),
" before fitting the model.", Some("true")),
ParamDesc[Boolean]("robustRegression", "whether to use robust Huber Cost Function",
Some("false")),
ParamDesc[Double]("robustEfficiency", "threshold in the distribution of errors (>= 0)",
isValid = "ParamValidators.gtEq(0)"),
ParamDesc[Long]("seed", "random seed", Some("this.getClass.getName.hashCode.toLong")),
ParamDesc[Double]("elasticNetParam", "the ElasticNet mixing parameter, in range [0, 1]." +
" For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,38 @@ private[ml] trait HasStandardization extends Params {
final def getStandardization: Boolean = $(standardization)
}

/**
* Trait for shared param robustRegression (default: false).
*/
private[ml] trait HasRobustRegression extends Params {

/**
* Param for whether to use robust Huber Cost Function.
* @group param
*/
final val robustRegression: BooleanParam = new BooleanParam(this, "robustRegression", "whether to use robust Huber Cost Function")

setDefault(robustRegression, false)

/** @group getParam */
final def getRobustRegression: Boolean = $(robustRegression)
}

/**
* Trait for shared param robustEfficiency.
*/
private[ml] trait HasRobustEfficiency extends Params {

/**
* Param for threshold in the distribution of errors (>= 0).
* @group param
*/
final val robustEfficiency: DoubleParam = new DoubleParam(this, "robustEfficiency", "threshold in the distribution of errors (>= 0)", ParamValidators.gtEq(0))

/** @group getParam */
final def getRobustEfficiency: Double = $(robustEfficiency)
}

/**
* Trait for shared param seed (default: this.getClass.getName.hashCode.toLong).
*/
Expand Down Expand Up @@ -388,5 +420,25 @@ private[ml] trait HasSolver extends Params {

/** @group getParam */
final def getSolver: String = $(solver)

}

/**
* Trait for shared param robust (default: false).
*/
private[ml] trait HasRobust extends Params {

/**
* Param for whether to fit an intercept term.
* @group param
*/
final val robust: BooleanParam = new BooleanParam(this, "robust", "whether to use robust Huber Cost Function")

setDefault(robust, false)

/** @group getParam */
final def getRobust: Boolean = $(robust)

}

// scalastyle:on
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ import org.apache.spark.storage.StorageLevel
*/
private[regression] trait LinearRegressionParams extends PredictorParams
with HasRegParam with HasElasticNetParam with HasMaxIter with HasTol
with HasFitIntercept with HasStandardization with HasWeightCol with HasSolver
with HasFitIntercept with HasStandardization with HasRobustRegression
with HasRobustEfficiency with HasWeightCol with HasSolver

/**
* :: Experimental ::
Expand Down Expand Up @@ -102,6 +103,22 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
def setStandardization(value: Boolean): this.type = set(standardization, value)
setDefault(standardization -> true)

/**
* Set the robust Option to determine whether to use robust Huber Cost Function
* Default is false.
* @group setParam
*/
def setRobustRegression(value: Boolean): this.type = set(robustRegression, value)
setDefault(robustRegression -> false)

/**
* Set the k threshold value in the probability distribution for the errors of robust regression.
* Default is 1.345 which will produce 95% efficiency.
* @group setParam
*/
def setRobustEfficiency(value: Double): this.type = set(robustEfficiency, value)
setDefault(robustEfficiency -> 1.345)

/**
* Set the ElasticNet mixing parameter.
* For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.
Expand Down Expand Up @@ -156,6 +173,14 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
def setSolver(value: String): this.type = set(solver, value)
setDefault(solver -> "auto")

/**
* Set the robust Option.
* Default is false.
* @group setParam
*/
def setRobust(value: Boolean): this.type = set(robust, value)
setDefault(robust -> false)

override protected def train(dataset: DataFrame): LinearRegressionModel = {
// Extract the number of features before deciding optimization solver.
val numFeatures = dataset.select(col($(featuresCol))).limit(1).map {
Expand Down Expand Up @@ -255,8 +280,13 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
val effectiveL1RegParam = $(elasticNetParam) * effectiveRegParam
val effectiveL2RegParam = (1.0 - $(elasticNetParam)) * effectiveRegParam

val costFun = new LeastSquaresCostFun(instances, yStd, yMean, $(fitIntercept),
$(standardization), featuresStd, featuresMean, effectiveL2RegParam)
val costFun = if ($(robustRegression)) {
new HuberCostFun(instances, yStd, yMean, $(fitIntercept),
$(standardization), featuresStd, featuresMean, effectiveL2RegParam, $(robustEfficiency))
} else {
new LeastSquaresCostFun(instances, yStd, yMean, $(fitIntercept),
$(standardization), featuresStd, featuresMean, effectiveL2RegParam)
}

val optimizer = if ($(elasticNetParam) == 0.0 || effectiveRegParam == 0.0) {
new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol))
Expand Down Expand Up @@ -937,3 +967,179 @@ private class LeastSquaresCostFun(
(leastSquaresAggregator.loss + regVal, new BDV(totalGradientArray))
}
}

/**
* HuberCostFun implements Breeze's DiffFunction[T] for Huber cost as used in Robust regression.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: surround references to code in [[DiffFunction[T]]] to have docs generate links

* The Huber M-estimator corresponds to a probability distribution for the errors which is normal
* in the centre but like a double exponential distribution in the tails (Hogg 1979: 109).
* L = 1/2 ||A weights-y||^2 if |A weights-y| <= k
* L = k |A weights-y| - 1/2 K^2 if |A weights-y| > k
* where k = 1.345 which produce 95% efficiency when the errors are normal and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to make k tunable and default to 1.345 by having another param?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, will be nice that users can set % of efficiency instead of 1.345 this kind of magic number.

* substantial resistance to outliers otherwise.
* See also the documentation for the precise formulation.
* It's used in Breeze's convex optimization routines.
*/

private class HuberAggregator(
weights: Vector,
labelStd: Double,
labelMean: Double,
fitIntercept: Boolean,
featuresStd: Array[Double],
featuresMean: Array[Double],
robustEfficiency: Double) extends Serializable {

private var totalCnt: Long = 0L
private var lossSum = 0.0

private val (effectiveWeightsArray: Array[Double], offset: Double, dim: Int) = {
val weightsArray = weights.toArray.clone()
var sum = 0.0
var i = 0
val len = weightsArray.length
while (i < len) {
if (featuresStd(i) != 0.0) {
weightsArray(i) /= featuresStd(i)
sum += weightsArray(i) * featuresMean(i)
} else {
weightsArray(i) = 0.0
}
i += 1
}
(weightsArray, if (fitIntercept) labelMean / labelStd - sum else 0.0, weightsArray.length)
}

private val effectiveWeightsVector = Vectors.dense(effectiveWeightsArray)

private val gradientSumArray = Array.ofDim[Double](dim)

/**
* Add a new training data to this HuberAggregator, and update the loss and gradient
* of the objective function.
*
* @param label The label for this data point.
* @param data The features for one data point in dense/sparse vector format to be added
* into this aggregator.
* @return This HuberAggregator object.
*/
def add(label: Double, data: Vector): this.type = {
require(dim == data.size, s"Dimensions mismatch when adding new sample." +
s" Expecting $dim but got ${data.size}.")

val diff = dot(data, effectiveWeightsVector) - label / labelStd + offset
val k = robustEfficiency

if (diff < -k) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks right now.

lossSum += (-k * diff - 0.5 * k * k) / 2.0
} else if (diff > k) {
lossSum += (k * diff - 0.5 * k * k) / 2.0
} else if (diff != 0) {
val localGradientSumArray = gradientSumArray
data.foreachActive { (index, value) =>
if (featuresStd(index) != 0.0 && value != 0.0) {
localGradientSumArray(index) += diff * value / featuresStd(index)
}
}
lossSum += diff * diff / 4.0
}

totalCnt += 1
this
}

/**
* Merge another HuberAggregator, and update the loss and gradient
* of the objective function.
* (Note that it's in place merging; as a result, `this` object will be modified.)
*
* @param other The other HuberAggregator to be merged.
* @return This HuberAggregator object.
*/
def merge(other: HuberAggregator): this.type = {
require(dim == other.dim, s"Dimensions mismatch when merging with another " +
s"HuberAggregator. Expecting $dim but got ${other.dim}.")

if (other.totalCnt != 0) {
totalCnt += other.totalCnt
lossSum += other.lossSum

var i = 0
val localThisGradientSumArray = this.gradientSumArray
val localOtherGradientSumArray = other.gradientSumArray
while (i < dim) {
localThisGradientSumArray(i) += localOtherGradientSumArray(i)
i += 1
}
}
this
}

def count: Long = totalCnt

def loss: Double = lossSum / totalCnt

def gradient: Vector = {
val result = Vectors.dense(gradientSumArray.clone())
scal(1.0 / totalCnt, result)
result
}
}

private class HuberCostFun(
data: RDD[(Double, Vector)],
labelStd: Double,
labelMean: Double,
fitIntercept: Boolean,
standardization: Boolean,
featuresStd: Array[Double],
featuresMean: Array[Double],
effectiveL2regParam: Double,
robustEfficiency: Double) extends DiffFunction[BDV[Double]] {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still have lots of duplicated code. Do you think you can pass a parameter to switch different loss?

override def calculate(weights: BDV[Double]): (Double, BDV[Double]) = {
val w = Vectors.fromBreeze(weights)

val huberAggregator = data.treeAggregate(new HuberAggregator(w, labelStd,
labelMean, fitIntercept, featuresStd, featuresMean, robustEfficiency))(
seqOp = (c, v) => (c, v) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can just be

seqOp = { case (aggregator, (label, features)) => aggregator.add(label, features) }

Ditto for combOp

case (aggregator, (label, features)) => aggregator.add(label, features)
},
combOp = (c1, c2) => (c1, c2) match {
case (aggregator1, aggregator2) => aggregator1.merge(aggregator2)
})

val totalGradientArray = huberAggregator.gradient.toArray

val regVal = if (effectiveL2regParam == 0.0) {
0.0
} else {
var sum = 0.0
w.foreachActive { (index, value) =>
// The following code will compute the loss of the regularization; also
// the gradient of the regularization, and add back to totalGradientArray.
sum += {
if (standardization) {
totalGradientArray(index) += effectiveL2regParam * value
value * value
} else {
if (featuresStd(index) != 0.0) {
// If `standardization` is false, we still standardize the data
// to improve the rate of convergence; as a result, we have to
// perform this reverse standardization by penalizing each component
// differently to get effectively the same objective function when
// the training dataset is not standardized.
val temp = value / (featuresStd(index) * featuresStd(index))
totalGradientArray(index) += effectiveL2regParam * temp
value * temp
} else {
0.0
}
}
}
}
0.5 * effectiveL2regParam * sum
}

(huberAggregator.loss + regVal, new BDV(totalGradientArray))
}
}
Loading