Skip to content

Commit

Permalink
[SPARK-24102][ML][MLLIB][PYSPARK][FOLLOWUP] Added weight column to py…
Browse files Browse the repository at this point in the history
…spark API for regression evaluator and metrics

## What changes were proposed in this pull request?
Followup to PR #17085
This PR adds the weight column to the pyspark side, which was already added to the scala API.
The PR also undoes a name change in the scala side corresponding to a change in another similar PR as noted here:
#17084 (comment)

## How was this patch tested?

This patch adds python tests for the changes to the pyspark API.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #24197 from imatiach-msft/ilmat/regressor-eval-python.

Authored-by: Ilya Matiach <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
  • Loading branch information
imatiach-msft authored and srowen committed Mar 26, 2019
1 parent 0e16a6f commit 887279c
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,19 @@ import org.apache.spark.internal.Logging
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.stat.{MultivariateOnlineSummarizer, MultivariateStatisticalSummary}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.{DataFrame, Row}

/**
* Evaluator for regression.
*
* @param predAndObsWithOptWeight an RDD of either (prediction, observation, weight)
* @param predictionAndObservations an RDD of either (prediction, observation, weight)
* or (prediction, observation) pairs
* @param throughOrigin True if the regression is through the origin. For example, in linear
* regression, it will be true without fitting intercept.
*/
@Since("1.2.0")
class RegressionMetrics @Since("2.0.0") (
predAndObsWithOptWeight: RDD[_ <: Product], throughOrigin: Boolean)
predictionAndObservations: RDD[_ <: Product], throughOrigin: Boolean)
extends Logging {

@Since("1.2.0")
Expand All @@ -47,13 +47,20 @@ class RegressionMetrics @Since("2.0.0") (
* prediction and observation
*/
private[mllib] def this(predictionAndObservations: DataFrame) =
this(predictionAndObservations.rdd.map(r => (r.getDouble(0), r.getDouble(1))))
this(predictionAndObservations.rdd.map {
case Row(prediction: Double, label: Double, weight: Double) =>
(prediction, label, weight)
case Row(prediction: Double, label: Double) =>
(prediction, label, 1.0)
case other =>
throw new IllegalArgumentException(s"Expected Row of tuples, got $other")
})

/**
* Use MultivariateOnlineSummarizer to calculate summary statistics of observations and errors.
*/
private lazy val summary: MultivariateStatisticalSummary = {
val summary: MultivariateStatisticalSummary = predAndObsWithOptWeight.map {
val summary: MultivariateStatisticalSummary = predictionAndObservations.map {
case (prediction: Double, observation: Double, weight: Double) =>
(Vectors.dense(observation, observation - prediction), weight)
case (prediction: Double, observation: Double) =>
Expand All @@ -70,7 +77,7 @@ class RegressionMetrics @Since("2.0.0") (
private lazy val SStot = summary.variance(0) * (summary.weightSum - 1)
private lazy val SSreg = {
val yMean = summary.mean(0)
predAndObsWithOptWeight.map {
predictionAndObservations.map {
case (prediction: Double, _: Double, weight: Double) =>
math.pow(prediction - yMean, 2) * weight
case (prediction: Double, _: Double) => math.pow(prediction - yMean, 2)
Expand Down
21 changes: 14 additions & 7 deletions python/pyspark/ml/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,13 @@ def setParams(self, rawPredictionCol="rawPrediction", labelCol="label",


@inherit_doc
class RegressionEvaluator(JavaEvaluator, HasLabelCol, HasPredictionCol,
class RegressionEvaluator(JavaEvaluator, HasLabelCol, HasPredictionCol, HasWeightCol,
JavaMLReadable, JavaMLWritable):
"""
.. note:: Experimental
Evaluator for Regression, which expects two input
columns: prediction and label.
Evaluator for Regression, which expects input columns prediction, label
and an optional weight column.
>>> scoreAndLabels = [(-28.98343821, -27.0), (20.21491975, 21.5),
... (-25.98418959, -22.0), (30.69731842, 33.0), (74.69283752, 71.0)]
Expand All @@ -214,6 +214,13 @@ class RegressionEvaluator(JavaEvaluator, HasLabelCol, HasPredictionCol,
>>> evaluator2 = RegressionEvaluator.load(re_path)
>>> str(evaluator2.getPredictionCol())
'raw'
>>> scoreAndLabelsAndWeight = [(-28.98343821, -27.0, 1.0), (20.21491975, 21.5, 0.8),
... (-25.98418959, -22.0, 1.0), (30.69731842, 33.0, 0.6), (74.69283752, 71.0, 0.2)]
>>> dataset = spark.createDataFrame(scoreAndLabelsAndWeight, ["raw", "label", "weight"])
...
>>> evaluator = RegressionEvaluator(predictionCol="raw", weightCol="weight")
>>> evaluator.evaluate(dataset)
2.740...
.. versionadded:: 1.4.0
"""
Expand All @@ -227,10 +234,10 @@ class RegressionEvaluator(JavaEvaluator, HasLabelCol, HasPredictionCol,

@keyword_only
def __init__(self, predictionCol="prediction", labelCol="label",
metricName="rmse"):
metricName="rmse", weightCol=None):
"""
__init__(self, predictionCol="prediction", labelCol="label", \
metricName="rmse")
metricName="rmse", weightCol=None)
"""
super(RegressionEvaluator, self).__init__()
self._java_obj = self._new_java_obj(
Expand All @@ -256,10 +263,10 @@ def getMetricName(self):
@keyword_only
@since("1.4.0")
def setParams(self, predictionCol="prediction", labelCol="label",
metricName="rmse"):
metricName="rmse", weightCol=None):
"""
setParams(self, predictionCol="prediction", labelCol="label", \
metricName="rmse")
metricName="rmse", weightCol=None)
Sets params for regression evaluator.
"""
kwargs = self._input_kwargs
Expand Down
16 changes: 12 additions & 4 deletions python/pyspark/mllib/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ class RegressionMetrics(JavaModelWrapper):
"""
Evaluator for regression.
:param predictionAndObservations: an RDD of (prediction,
observation) pairs.
:param predictionAndObservations: an RDD of prediction, observation and optional weight.
>>> predictionAndObservations = sc.parallelize([
... (2.5, 3.0), (0.0, -0.5), (2.0, 2.0), (8.0, 7.0)])
Expand All @@ -111,16 +110,25 @@ class RegressionMetrics(JavaModelWrapper):
0.61...
>>> metrics.r2
0.94...
>>> predictionAndObservationsWithOptWeight = sc.parallelize([
... (2.5, 3.0, 0.5), (0.0, -0.5, 1.0), (2.0, 2.0, 0.3), (8.0, 7.0, 0.9)])
>>> metrics = RegressionMetrics(predictionAndObservationsWithOptWeight)
>>> metrics.rootMeanSquaredError
0.68...
.. versionadded:: 1.4.0
"""

def __init__(self, predictionAndObservations):
sc = predictionAndObservations.ctx
sql_ctx = SQLContext.getOrCreate(sc)
df = sql_ctx.createDataFrame(predictionAndObservations, schema=StructType([
numCol = len(predictionAndObservations.first())
schema = StructType([
StructField("prediction", DoubleType(), nullable=False),
StructField("observation", DoubleType(), nullable=False)]))
StructField("observation", DoubleType(), nullable=False)])
if numCol == 3:
schema.add("weight", DoubleType(), False)
df = sql_ctx.createDataFrame(predictionAndObservations, schema=schema)
java_class = sc._jvm.org.apache.spark.mllib.evaluation.RegressionMetrics
java_model = java_class(df._jdf)
super(RegressionMetrics, self).__init__(java_model)
Expand Down

0 comments on commit 887279c

Please sign in to comment.