Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix collect time metric in CoalesceBatches #729

Merged
merged 4 commits into from
Sep 11, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ abstract class AbstractGpuCoalesceIterator(origIter: Iterator[ColumnarBatch],
private val iter = new RemoveEmptyBatchIterator(origIter, numInputBatches)
private var onDeck: Option[ColumnarBatch] = None
private var batchInitialized: Boolean = false
private var collectMetric: Option[MetricRange] = None
private var totalMetric: Option[MetricRange] = None

/** We need to track the sizes of string columns to make sure we don't exceed 2GB */
private val stringFieldIndices: Array[Int] = schema.fields.zipWithIndex
Expand All @@ -174,7 +176,22 @@ abstract class AbstractGpuCoalesceIterator(origIter: Iterator[ColumnarBatch],
Option(TaskContext.get())
.foreach(_.addTaskCompletionListener[Unit](_ => onDeck.foreach(_.close())))

override def hasNext: Boolean = onDeck.isDefined || iter.hasNext
override def hasNext: Boolean = {
if (!collectMetric.isDefined) {
// use one being not set as indicator that neither are intialized to avoid
// 2 checks or extra initialized variable
collectMetric = Some(new MetricRange(collectTime))
totalMetric = Some(new MetricRange(totalTime))
}
val res = onDeck.isDefined || iter.hasNext
if (!res) {
collectMetric.foreach(_.close())
collectMetric = None
totalMetric.foreach(_.close())
totalMetric = None
}
res
}

/**
* Called first to initialize any state needed for a new batch to be created.
Expand Down Expand Up @@ -236,9 +253,6 @@ abstract class AbstractGpuCoalesceIterator(origIter: Iterator[ColumnarBatch],
* @return The coalesced batch
*/
override def next(): ColumnarBatch = {

val total = new MetricRange(totalTime)

// reset batch state
batchInitialized = false
batchRowLimit = 0
Expand All @@ -261,7 +275,6 @@ abstract class AbstractGpuCoalesceIterator(origIter: Iterator[ColumnarBatch],
numBytes += columnSizes.sum
}

val collect = new MetricRange(collectTime)
try {

// there is a hard limit of 2^31 rows
Expand Down Expand Up @@ -339,7 +352,8 @@ abstract class AbstractGpuCoalesceIterator(origIter: Iterator[ColumnarBatch],
s"and $numBytes bytes")

} finally {
collect.close()
collectMetric.foreach(_.close())
collectMetric = None
}

val concatRange = new NvtxWithMetrics(s"$opName concat", NvtxColor.CYAN, concatTime)
Expand All @@ -351,7 +365,8 @@ abstract class AbstractGpuCoalesceIterator(origIter: Iterator[ColumnarBatch],
ret
} finally {
cleanupConcatIsDone()
total.close()
totalMetric.foreach(_.close())
totalMetric = None
}
}

Expand Down