From 7c982fcf2a1a20897d71029ba1eb561894ead714 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Tue, 30 Apr 2024 14:13:02 -0700 Subject: [PATCH] Revert "Fix Multithreaded Readers working with Unity Catalog on Databricks [databricks] (#8296)" This reverts commit 5f4071191ab5b60b67f12dcf92693b2e233b8ad9. --- .../nvidia/spark/rapids/GpuParquetScan.scala | 22 ++++++++----------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index 4f140f27bf36..df071e3a4b95 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -38,7 +38,7 @@ import com.nvidia.spark.rapids.RapidsConf.ParquetFooterReaderType import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.filecache.FileCache import com.nvidia.spark.rapids.jni.{DateTimeRebase, ParquetFooter} -import com.nvidia.spark.rapids.shims.{ColumnDefaultValuesShims, GpuParquetCrypto, GpuTypeShims, ParquetLegacyNanoAsLongShims, ParquetSchemaClipShims, ParquetStringPredShims, ReaderUtils, ShimFilePartitionReaderFactory, SparkShimImpl} +import com.nvidia.spark.rapids.shims.{ColumnDefaultValuesShims, GpuParquetCrypto, GpuTypeShims, ParquetLegacyNanoAsLongShims, ParquetSchemaClipShims, ParquetStringPredShims, ShimFilePartitionReaderFactory, SparkShimImpl} import org.apache.commons.io.IOUtils import org.apache.commons.io.output.{CountingOutputStream, NullOutputStream} import org.apache.hadoop.conf.Configuration @@ -683,12 +683,10 @@ private case class GpuParquetFileFilterHandler( conf.unset(encryptConf) } } - val fileHadoopConf = - ReaderUtils.getHadoopConfForReaderThread(new Path(file.filePath.toString), conf) val footer: ParquetMetadata = try { footerReader match { case ParquetFooterReaderType.NATIVE => - val serialized = withResource(readAndFilterFooter(file, fileHadoopConf, + val serialized = withResource(readAndFilterFooter(file, conf, readDataSchema, filePath)) { tableFooter => if (tableFooter.getNumColumns <= 0) { // Special case because java parquet reader does not like having 0 columns. @@ -712,7 +710,7 @@ private case class GpuParquetFileFilterHandler( } } case _ => - readAndSimpleFilterFooter(file, fileHadoopConf, filePath) + readAndSimpleFilterFooter(file, conf, filePath) } } catch { case e if GpuParquetCrypto.isColumnarCryptoException(e) => @@ -739,9 +737,9 @@ private case class GpuParquetFileFilterHandler( val blocks = if (pushedFilters.isDefined) { withResource(new NvtxRange("getBlocksWithFilter", NvtxColor.CYAN)) { _ => // Use the ParquetFileReader to perform dictionary-level filtering - ParquetInputFormat.setFilterPredicate(fileHadoopConf, pushedFilters.get) + ParquetInputFormat.setFilterPredicate(conf, pushedFilters.get) //noinspection ScalaDeprecation - withResource(new ParquetFileReader(fileHadoopConf, footer.getFileMetaData, filePath, + withResource(new ParquetFileReader(conf, footer.getFileMetaData, filePath, footer.getBlocks, Collections.emptyList[ColumnDescriptor])) { parquetReader => parquetReader.getRowGroups } @@ -1545,14 +1543,13 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics val filePathString: String = filePath.toString val remoteItems = new ArrayBuffer[CopyRange](blocks.length) var totalBytesToCopy = 0L - val fileHadoopConf = ReaderUtils.getHadoopConfForReaderThread(filePath, conf) withResource(new ArrayBuffer[LocalCopy](blocks.length)) { localItems => blocks.foreach { block => block.getColumns.asScala.foreach { column => val columnSize = column.getTotalSize val outputOffset = totalBytesToCopy + startPos val channel = FileCache.get.getDataRangeChannel(filePathString, - column.getStartingPos, columnSize, fileHadoopConf) + column.getStartingPos, columnSize, conf) if (channel.isDefined) { localItems += LocalCopy(channel.get, columnSize, outputOffset) } else { @@ -1583,14 +1580,13 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics return 0L } - val fileHadoopConf = ReaderUtils.getHadoopConfForReaderThread(filePath, conf) val coalescedRanges = coalesceReads(remoteCopies) val totalBytesCopied = PerfIO.readToHostMemory( - fileHadoopConf, out.buffer, filePath.toUri, + conf, out.buffer, filePath.toUri, coalescedRanges.map(r => IntRangeWithOffset(r.offset, r.length, r.outputOffset)) ).getOrElse { - withResource(filePath.getFileSystem(fileHadoopConf).open(filePath)) { in => + withResource(filePath.getFileSystem(conf).open(filePath)) { in => val copyBuffer: Array[Byte] = new Array[Byte](copyBufferSize) coalescedRanges.foldLeft(0L) { (acc, blockCopy) => acc + copyDataRange(blockCopy, in, out, copyBuffer) @@ -1602,7 +1598,7 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics metrics.getOrElse(GpuMetric.FILECACHE_DATA_RANGE_MISSES, NoopMetric) += 1 metrics.getOrElse(GpuMetric.FILECACHE_DATA_RANGE_MISSES_SIZE, NoopMetric) += range.length val cacheToken = FileCache.get.startDataRangeCache( - filePathString, range.offset, range.length, fileHadoopConf) + filePathString, range.offset, range.length, conf) // If we get a filecache token then we can complete the caching by providing the data. // If we do not get a token then we should not cache this data. cacheToken.foreach { token =>