Skip to content

Commit

Permalink
[SPARK-20134][SQL] SQLMetrics.postDriverMetricUpdates to simplify dri…
Browse files Browse the repository at this point in the history
…ver side metric updates

## What changes were proposed in this pull request?
It is not super intuitive how to update SQLMetric on the driver side. This patch introduces a new SQLMetrics.postDriverMetricUpdates function to do that, and adds documentation to make it more obvious.

## How was this patch tested?
Updated a test case to use this method.

Author: Reynold Xin <[email protected]>

Closes #17464 from rxin/SPARK-20134.

(cherry picked from commit 9712bd3)
Signed-off-by: Reynold Xin <[email protected]>
  • Loading branch information
rxin committed Mar 29, 2017
1 parent 3095480 commit f8c1b3e
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -565,13 +565,7 @@ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode {
val dataSize = rows.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
longMetric("dataSize") += dataSize

// There are some cases we don't care about the metrics and call `SparkPlan.doExecute`
// directly without setting an execution id. We should be tolerant to it.
if (executionId != null) {
sparkContext.listenerBus.post(SparkListenerDriverAccumUpdates(
executionId.toLong, metrics.values.map(m => m.id -> m.value).toSeq))
}

SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq)
rows
}
}(SubqueryExec.executionContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,7 @@ case class BroadcastExchangeExec(
val broadcasted = sparkContext.broadcast(relation)
longMetric("broadcastTime") += (System.nanoTime() - beforeBroadcast) / 1000000

// There are some cases we don't care about the metrics and call `SparkPlan.doExecute`
// directly without setting an execution id. We should be tolerant to it.
if (executionId != null) {
sparkContext.listenerBus.post(SparkListenerDriverAccumUpdates(
executionId.toLong, metrics.values.map(m => m.id -> m.value).toSeq))
}

SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq)
broadcasted
} catch {
case oe: OutOfMemoryError =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,15 @@ import java.util.Locale

import org.apache.spark.SparkContext
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, Utils}


/**
* A metric used in a SQL query plan. This is implemented as an [[AccumulatorV2]]. Updates on
* the executor side are automatically propagated and shown in the SQL UI through metrics. Updates
* on the driver side must be explicitly posted using [[SQLMetrics.postDriverMetricUpdates()]].
*/
class SQLMetric(val metricType: String, initValue: Long = 0L) extends AccumulatorV2[Long, Long] {
// This is a workaround for SPARK-11013.
// We may use -1 as initial value of the accumulator, if the accumulator is valid, we will
Expand Down Expand Up @@ -126,4 +132,18 @@ object SQLMetrics {
s"\n$sum ($min, $med, $max)"
}
}

/**
* Updates metrics based on the driver side value. This is useful for certain metrics that
* are only updated on the driver, e.g. subquery execution time, or number of files.
*/
def postDriverMetricUpdates(
sc: SparkContext, executionId: String, metrics: Seq[SQLMetric]): Unit = {
// There are some cases we don't care about the metrics and call `SparkPlan.doExecute`
// directly without setting an execution id. We should be tolerant to it.
if (executionId != null) {
sc.listenerBus.post(
SparkListenerDriverAccumUpdates(executionId.toLong, metrics.map(m => m.id -> m.value)))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ case class SparkListenerSQLExecutionStart(
case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)
extends SparkListenerEvent

/**
* A message used to update SQL metric value for driver-side updates (which doesn't get reflected
* automatically).
*
* @param executionId The execution id for a query, so we can find the query plan.
* @param accumUpdates Map from accumulator id to the metric value (metrics are always 64-bit ints).
*/
@DeveloperApi
case class SparkListenerDriverAccumUpdates(
executionId: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,9 +477,11 @@ private case class MyPlan(sc: SparkContext, expectedValue: Long) extends LeafExe

override def doExecute(): RDD[InternalRow] = {
longMetric("dummy") += expectedValue
sc.listenerBus.post(SparkListenerDriverAccumUpdates(
sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY).toLong,
metrics.values.map(m => m.id -> m.value).toSeq))

SQLMetrics.postDriverMetricUpdates(
sc,
sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY),
metrics.values.toSeq)
sc.emptyRDD
}
}
Expand Down

0 comments on commit f8c1b3e

Please sign in to comment.