Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-612] Add metric "prepare time" for shuffle writer (#613)
Browse files Browse the repository at this point in the history
Closes #612
  • Loading branch information
zhztheplayer authored Dec 8, 2021
1 parent 631b587 commit 11baed5
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import scala.reflect.ClassTag
* @param bytesSpilled for shuffle spill size tracking
* @param computePidTime partition id computation time metric
* @param splitTime native split time metric
* @param prepareTime native split prepare time metric
*/
class ColumnarShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
Expand All @@ -60,7 +61,8 @@ class ColumnarShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
val computePidTime: SQLMetric,
val splitTime: SQLMetric,
val spillTime: SQLMetric,
val compressTime: SQLMetric)
val compressTime: SQLMetric,
val prepareTime: SQLMetric)
extends ShuffleDependency[K, V, C](
_rdd,
partitioner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ class ColumnarShuffleWriter[K, V](
if (cb.numRows == 0 || cb.numCols == 0) {
logInfo(s"Skip ColumnarBatch of ${cb.numRows} rows, ${cb.numCols} cols")
} else {
val startTimeForPrepare = System.nanoTime()
val bufAddrs = new ListBuffer[Long]()
val bufSizes = new ListBuffer[Long]()
val recordBatch = ConverterUtils.createArrowRecordBatch(cb)
Expand Down Expand Up @@ -164,6 +165,7 @@ class ColumnarShuffleWriter[K, V](
}
}
firstRecordBatch = false
dep.prepareTime.add(System.nanoTime() - startTimeForPrepare)

jniWrapper.split(nativeSplitter, cb.numRows, bufAddrs.toArray, bufSizes.toArray, firstRecordBatch)
dep.splitTime.add(System.nanoTime() - startTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ case class ColumnarShuffleExchangeExec(
"splitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime_split"),
"spillTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "shuffle spill time"),
"compressTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime_compress"),
"prepareTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime_prepare"),
"avgReadBatchNumRows" -> SQLMetrics
.createAverageMetric(sparkContext, "avg read batch num rows"),
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
Expand Down Expand Up @@ -140,7 +141,8 @@ case class ColumnarShuffleExchangeExec(
longMetric("computePidTime"),
longMetric("splitTime"),
longMetric("spillTime"),
longMetric("compressTime"))
longMetric("compressTime"),
longMetric("prepareTime"))
}

var cachedShuffleRDD: ShuffledColumnarBatchRDD = _
Expand Down Expand Up @@ -187,6 +189,7 @@ class ColumnarShuffleExchangeAdaptor(
"splitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime_split"),
"spillTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "shuffle spill time"),
"compressTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime_compress"),
"prepareTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime_prepare"),
"avgReadBatchNumRows" -> SQLMetrics
.createAverageMetric(sparkContext, "avg read batch num rows"),
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
Expand Down Expand Up @@ -236,7 +239,8 @@ class ColumnarShuffleExchangeAdaptor(
longMetric("computePidTime"),
longMetric("splitTime"),
longMetric("spillTime"),
longMetric("compressTime"))
longMetric("compressTime"),
longMetric("prepareTime"))
}

var cachedShuffleRDD: ShuffledColumnarBatchRDD = _
Expand Down Expand Up @@ -301,7 +305,8 @@ object ColumnarShuffleExchangeExec extends Logging {
computePidTime: SQLMetric,
splitTime: SQLMetric,
spillTime: SQLMetric,
compressTime: SQLMetric): ShuffleDependency[Int, ColumnarBatch, ColumnarBatch] = {
compressTime: SQLMetric,
prepareTime: SQLMetric): ShuffleDependency[Int, ColumnarBatch, ColumnarBatch] = {
val arrowFields = outputAttributes.map(attr => ConverterUtils.createArrowField(attr))
def serializeSchema(fields: Seq[Field]): Array[Byte] = {
val schema = new Schema(fields.asJava)
Expand Down Expand Up @@ -447,7 +452,8 @@ object ColumnarShuffleExchangeExec extends Logging {
computePidTime = computePidTime,
splitTime = splitTime,
spillTime = spillTime,
compressTime = compressTime)
compressTime = compressTime,
prepareTime = prepareTime)

dependency
}
Expand Down

0 comments on commit 11baed5

Please sign in to comment.