diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala index 525047973ad5c..3cf858471b98d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala @@ -22,19 +22,19 @@ import org.apache.spark.internal.Logging import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.stat.{MultivariateOnlineSummarizer, MultivariateStatisticalSummary} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{DataFrame, Row} /** * Evaluator for regression. * - * @param predAndObsWithOptWeight an RDD of either (prediction, observation, weight) + * @param predictionAndObservations an RDD of either (prediction, observation, weight) * or (prediction, observation) pairs * @param throughOrigin True if the regression is through the origin. For example, in linear * regression, it will be true without fitting intercept. */ @Since("1.2.0") class RegressionMetrics @Since("2.0.0") ( - predAndObsWithOptWeight: RDD[_ <: Product], throughOrigin: Boolean) + predictionAndObservations: RDD[_ <: Product], throughOrigin: Boolean) extends Logging { @Since("1.2.0") @@ -47,13 +47,20 @@ class RegressionMetrics @Since("2.0.0") ( * prediction and observation */ private[mllib] def this(predictionAndObservations: DataFrame) = - this(predictionAndObservations.rdd.map(r => (r.getDouble(0), r.getDouble(1)))) + this(predictionAndObservations.rdd.map { + case Row(prediction: Double, label: Double, weight: Double) => + (prediction, label, weight) + case Row(prediction: Double, label: Double) => + (prediction, label, 1.0) + case other => + throw new IllegalArgumentException(s"Expected Row of tuples, got $other") + }) /** * Use MultivariateOnlineSummarizer to calculate summary statistics of observations and errors. */ private lazy val summary: MultivariateStatisticalSummary = { - val summary: MultivariateStatisticalSummary = predAndObsWithOptWeight.map { + val summary: MultivariateStatisticalSummary = predictionAndObservations.map { case (prediction: Double, observation: Double, weight: Double) => (Vectors.dense(observation, observation - prediction), weight) case (prediction: Double, observation: Double) => @@ -70,7 +77,7 @@ class RegressionMetrics @Since("2.0.0") ( private lazy val SStot = summary.variance(0) * (summary.weightSum - 1) private lazy val SSreg = { val yMean = summary.mean(0) - predAndObsWithOptWeight.map { + predictionAndObservations.map { case (prediction: Double, _: Double, weight: Double) => math.pow(prediction - yMean, 2) * weight case (prediction: Double, _: Double) => math.pow(prediction - yMean, 2) diff --git a/python/pyspark/ml/evaluation.py b/python/pyspark/ml/evaluation.py index 0f70860ceaf0f..8aca74d3f4d30 100644 --- a/python/pyspark/ml/evaluation.py +++ b/python/pyspark/ml/evaluation.py @@ -190,13 +190,13 @@ def setParams(self, rawPredictionCol="rawPrediction", labelCol="label", @inherit_doc -class RegressionEvaluator(JavaEvaluator, HasLabelCol, HasPredictionCol, +class RegressionEvaluator(JavaEvaluator, HasLabelCol, HasPredictionCol, HasWeightCol, JavaMLReadable, JavaMLWritable): """ .. note:: Experimental - Evaluator for Regression, which expects two input - columns: prediction and label. + Evaluator for Regression, which expects input columns prediction, label + and an optional weight column. >>> scoreAndLabels = [(-28.98343821, -27.0), (20.21491975, 21.5), ... (-25.98418959, -22.0), (30.69731842, 33.0), (74.69283752, 71.0)] @@ -214,6 +214,13 @@ class RegressionEvaluator(JavaEvaluator, HasLabelCol, HasPredictionCol, >>> evaluator2 = RegressionEvaluator.load(re_path) >>> str(evaluator2.getPredictionCol()) 'raw' + >>> scoreAndLabelsAndWeight = [(-28.98343821, -27.0, 1.0), (20.21491975, 21.5, 0.8), + ... (-25.98418959, -22.0, 1.0), (30.69731842, 33.0, 0.6), (74.69283752, 71.0, 0.2)] + >>> dataset = spark.createDataFrame(scoreAndLabelsAndWeight, ["raw", "label", "weight"]) + ... + >>> evaluator = RegressionEvaluator(predictionCol="raw", weightCol="weight") + >>> evaluator.evaluate(dataset) + 2.740... .. versionadded:: 1.4.0 """ @@ -227,10 +234,10 @@ class RegressionEvaluator(JavaEvaluator, HasLabelCol, HasPredictionCol, @keyword_only def __init__(self, predictionCol="prediction", labelCol="label", - metricName="rmse"): + metricName="rmse", weightCol=None): """ __init__(self, predictionCol="prediction", labelCol="label", \ - metricName="rmse") + metricName="rmse", weightCol=None) """ super(RegressionEvaluator, self).__init__() self._java_obj = self._new_java_obj( @@ -256,10 +263,10 @@ def getMetricName(self): @keyword_only @since("1.4.0") def setParams(self, predictionCol="prediction", labelCol="label", - metricName="rmse"): + metricName="rmse", weightCol=None): """ setParams(self, predictionCol="prediction", labelCol="label", \ - metricName="rmse") + metricName="rmse", weightCol=None) Sets params for regression evaluator. """ kwargs = self._input_kwargs diff --git a/python/pyspark/mllib/evaluation.py b/python/pyspark/mllib/evaluation.py index 171c62ce97f9b..30032b31982b0 100644 --- a/python/pyspark/mllib/evaluation.py +++ b/python/pyspark/mllib/evaluation.py @@ -95,8 +95,7 @@ class RegressionMetrics(JavaModelWrapper): """ Evaluator for regression. - :param predictionAndObservations: an RDD of (prediction, - observation) pairs. + :param predictionAndObservations: an RDD of prediction, observation and optional weight. >>> predictionAndObservations = sc.parallelize([ ... (2.5, 3.0), (0.0, -0.5), (2.0, 2.0), (8.0, 7.0)]) @@ -111,6 +110,11 @@ class RegressionMetrics(JavaModelWrapper): 0.61... >>> metrics.r2 0.94... + >>> predictionAndObservationsWithOptWeight = sc.parallelize([ + ... (2.5, 3.0, 0.5), (0.0, -0.5, 1.0), (2.0, 2.0, 0.3), (8.0, 7.0, 0.9)]) + >>> metrics = RegressionMetrics(predictionAndObservationsWithOptWeight) + >>> metrics.rootMeanSquaredError + 0.68... .. versionadded:: 1.4.0 """ @@ -118,9 +122,13 @@ class RegressionMetrics(JavaModelWrapper): def __init__(self, predictionAndObservations): sc = predictionAndObservations.ctx sql_ctx = SQLContext.getOrCreate(sc) - df = sql_ctx.createDataFrame(predictionAndObservations, schema=StructType([ + numCol = len(predictionAndObservations.first()) + schema = StructType([ StructField("prediction", DoubleType(), nullable=False), - StructField("observation", DoubleType(), nullable=False)])) + StructField("observation", DoubleType(), nullable=False)]) + if numCol == 3: + schema.add("weight", DoubleType(), False) + df = sql_ctx.createDataFrame(predictionAndObservations, schema=schema) java_class = sc._jvm.org.apache.spark.mllib.evaluation.RegressionMetrics java_model = java_class(df._jdf) super(RegressionMetrics, self).__init__(java_model)