Skip to content

Commit

Permalink
Revert "Semaphore optimization in scan"
Browse files Browse the repository at this point in the history
This reverts commit 71c8166.
  • Loading branch information
firestarman committed Apr 29, 2024
1 parent 53c5365 commit 3d2d44a
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -639,8 +639,6 @@ abstract class MultiFileCloudPartitionReaderBase(
return true
}

// Read starts with IO operations, so leaving GPU for a while.
GpuSemaphore.releaseIfNecessary(TaskContext.get())
// Temporary until we get more to read
batchIter = EmptyGpuColumnarBatchIterator
// if we have batch left from the last file read return it
Expand Down Expand Up @@ -1033,9 +1031,6 @@ abstract class MultiFileCoalescingPartitionReaderBase(
def startNewBufferRetry: Unit = ()

private def readBatch(): Iterator[ColumnarBatch] = {
val taskContext = TaskContext.get()
// Read begins with IO operations, so leaving GPU for a while.
GpuSemaphore.releaseIfNecessary(taskContext)
withResource(new NvtxRange(s"$getFileFormatShortName readBatch", NvtxColor.GREEN)) { _ =>
val currentChunkMeta = populateCurrentBlockChunk()
val batchIter = if (currentChunkMeta.clippedSchema.isEmpty) {
Expand All @@ -1045,7 +1040,7 @@ abstract class MultiFileCoalescingPartitionReaderBase(
} else {
val rows = currentChunkMeta.numTotalRows.toInt
// Someone is going to process this data, even if it is just a row count
GpuSemaphore.acquireIfNecessary(taskContext)
GpuSemaphore.acquireIfNecessary(TaskContext.get())
val nullColumns = currentChunkMeta.readSchema.safeMap(f =>
GpuColumnVector.fromNull(rows, f.dataType).asInstanceOf[SparkVector])
val emptyBatch = new ColumnarBatch(nullColumns.toArray, rows)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2783,9 +2783,6 @@ class ParquetPartitionReader(
}

private def readBatches(): Iterator[ColumnarBatch] = {
val taskContext = TaskContext.get()
// Read starts with IO operations, so leaving GPU for a while.
GpuSemaphore.releaseIfNecessary(taskContext)
withResource(new NvtxRange("Parquet readBatch", NvtxColor.GREEN)) { _ =>
val currentChunkedBlocks = populateCurrentBlockChunk(blockIterator,
maxReadBatchSizeRows, maxReadBatchSizeBytes, readDataSchema)
Expand All @@ -2796,7 +2793,7 @@ class ParquetPartitionReader(
EmptyGpuColumnarBatchIterator
} else {
// Someone is going to process this data, even if it is just a row count
GpuSemaphore.acquireIfNecessary(taskContext)
GpuSemaphore.acquireIfNecessary(TaskContext.get())
val nullColumns = readDataSchema.safeMap(f =>
GpuColumnVector.fromNull(numRows, f.dataType).asInstanceOf[SparkVector])
new SingleGpuColumnarBatchIterator(new ColumnarBatch(nullColumns.toArray, numRows))
Expand All @@ -2815,7 +2812,7 @@ class ParquetPartitionReader(
CachedGpuBatchIterator(EmptyTableReader, colTypes)
} else {
// about to start using the GPU
GpuSemaphore.acquireIfNecessary(taskContext)
GpuSemaphore.acquireIfNecessary(TaskContext.get())

RmmRapidsRetryIterator.withRetryNoSplit(dataBuffer) { _ =>
// Inc the ref count because MakeParquetTableProducer will try to close the dataBuffer
Expand Down

0 comments on commit 3d2d44a

Please sign in to comment.