Skip to content

Commit

Permalink
Revert "Fix Multithreaded Readers working with Unity Catalog on Datab…
Browse files Browse the repository at this point in the history
…ricks [databricks] (NVIDIA#8296)"
  • Loading branch information
razajafri committed Apr 30, 2024
1 parent 75c683e commit fcc7700
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import com.nvidia.spark.rapids.RapidsConf.ParquetFooterReaderType
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.filecache.FileCache
import com.nvidia.spark.rapids.jni.{DateTimeRebase, ParquetFooter}
import com.nvidia.spark.rapids.shims.{ColumnDefaultValuesShims, GpuParquetCrypto, GpuTypeShims, ParquetLegacyNanoAsLongShims, ParquetSchemaClipShims, ParquetStringPredShims, ReaderUtils, ShimFilePartitionReaderFactory, SparkShimImpl}
import com.nvidia.spark.rapids.shims.{ColumnDefaultValuesShims, GpuParquetCrypto, GpuTypeShims, ParquetLegacyNanoAsLongShims, ParquetSchemaClipShims, ParquetStringPredShims, ShimFilePartitionReaderFactory, SparkShimImpl}
import org.apache.commons.io.IOUtils
import org.apache.commons.io.output.{CountingOutputStream, NullOutputStream}
import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -683,12 +683,10 @@ private case class GpuParquetFileFilterHandler(
conf.unset(encryptConf)
}
}
val fileHadoopConf =
ReaderUtils.getHadoopConfForReaderThread(new Path(file.filePath.toString), conf)
val footer: ParquetMetadata = try {
footerReader match {
case ParquetFooterReaderType.NATIVE =>
val serialized = withResource(readAndFilterFooter(file, fileHadoopConf,
val serialized = withResource(readAndFilterFooter(file, conf,
readDataSchema, filePath)) { tableFooter =>
if (tableFooter.getNumColumns <= 0) {
// Special case because java parquet reader does not like having 0 columns.
Expand All @@ -712,7 +710,7 @@ private case class GpuParquetFileFilterHandler(
}
}
case _ =>
readAndSimpleFilterFooter(file, fileHadoopConf, filePath)
readAndSimpleFilterFooter(file, conf, filePath)
}
} catch {
case e if GpuParquetCrypto.isColumnarCryptoException(e) =>
Expand All @@ -739,9 +737,9 @@ private case class GpuParquetFileFilterHandler(
val blocks = if (pushedFilters.isDefined) {
withResource(new NvtxRange("getBlocksWithFilter", NvtxColor.CYAN)) { _ =>
// Use the ParquetFileReader to perform dictionary-level filtering
ParquetInputFormat.setFilterPredicate(fileHadoopConf, pushedFilters.get)
ParquetInputFormat.setFilterPredicate(conf, pushedFilters.get)
//noinspection ScalaDeprecation
withResource(new ParquetFileReader(fileHadoopConf, footer.getFileMetaData, filePath,
withResource(new ParquetFileReader(conf, footer.getFileMetaData, filePath,
footer.getBlocks, Collections.emptyList[ColumnDescriptor])) { parquetReader =>
parquetReader.getRowGroups
}
Expand Down Expand Up @@ -1545,14 +1543,13 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics
val filePathString: String = filePath.toString
val remoteItems = new ArrayBuffer[CopyRange](blocks.length)
var totalBytesToCopy = 0L
val fileHadoopConf = ReaderUtils.getHadoopConfForReaderThread(filePath, conf)
withResource(new ArrayBuffer[LocalCopy](blocks.length)) { localItems =>
blocks.foreach { block =>
block.getColumns.asScala.foreach { column =>
val columnSize = column.getTotalSize
val outputOffset = totalBytesToCopy + startPos
val channel = FileCache.get.getDataRangeChannel(filePathString,
column.getStartingPos, columnSize, fileHadoopConf)
column.getStartingPos, columnSize, conf)
if (channel.isDefined) {
localItems += LocalCopy(channel.get, columnSize, outputOffset)
} else {
Expand Down Expand Up @@ -1583,14 +1580,13 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics
return 0L
}

val fileHadoopConf = ReaderUtils.getHadoopConfForReaderThread(filePath, conf)
val coalescedRanges = coalesceReads(remoteCopies)

val totalBytesCopied = PerfIO.readToHostMemory(
fileHadoopConf, out.buffer, filePath.toUri,
conf, out.buffer, filePath.toUri,
coalescedRanges.map(r => IntRangeWithOffset(r.offset, r.length, r.outputOffset))
).getOrElse {
withResource(filePath.getFileSystem(fileHadoopConf).open(filePath)) { in =>
withResource(filePath.getFileSystem(conf).open(filePath)) { in =>
val copyBuffer: Array[Byte] = new Array[Byte](copyBufferSize)
coalescedRanges.foldLeft(0L) { (acc, blockCopy) =>
acc + copyDataRange(blockCopy, in, out, copyBuffer)
Expand All @@ -1602,7 +1598,7 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics
metrics.getOrElse(GpuMetric.FILECACHE_DATA_RANGE_MISSES, NoopMetric) += 1
metrics.getOrElse(GpuMetric.FILECACHE_DATA_RANGE_MISSES_SIZE, NoopMetric) += range.length
val cacheToken = FileCache.get.startDataRangeCache(
filePathString, range.offset, range.length, fileHadoopConf)
filePathString, range.offset, range.length, conf)
// If we get a filecache token then we can complete the caching by providing the data.
// If we do not get a token then we should not cache this data.
cacheToken.foreach { token =>
Expand Down

This file was deleted.

This file was deleted.

0 comments on commit fcc7700

Please sign in to comment.