From e614973022431d146f5d376f8dc5c30b5c1a1799 Mon Sep 17 00:00:00 2001 From: Alfred Xu Date: Fri, 19 Jan 2024 22:30:21 +0900 Subject: [PATCH] Dump Parquet Meta as SparkMetrics (#29) Signed-off-by: sperlingxx --- .../rapids/GpuBatchScanExecMetrics.scala | 14 ++- .../com/nvidia/spark/rapids/GpuExec.scala | 3 + .../spark/rapids/GpuMultiFileReader.scala | 92 +++++++++++++++---- .../nvidia/spark/rapids/GpuParquetScan.scala | 25 +++++ .../spark/rapids/GpuTransitionOverrides.scala | 2 +- .../sql/rapids/GpuFileSourceScanExec.scala | 12 ++- 6 files changed, 128 insertions(+), 20 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExecMetrics.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExecMetrics.scala index ef890b1551f..c7ed48f3b63 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExecMetrics.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExecMetrics.scala @@ -36,7 +36,19 @@ trait GpuBatchScanExecMetrics extends GpuExec { lazy val fileCacheMetrics: Map[String, GpuMetric] = { // File cache only supported on Parquet files for now. scan match { - case _: GpuParquetScan | _: GpuOrcScan => createFileCacheMetrics() + case _: GpuParquetScan | _: GpuOrcScan => + createFileCacheMetrics() ++ + // For debugging ByteDance workloads + Map( + "compPageSize" -> createSizeMetric(DEBUG_LEVEL, "compressed page size"), + "unCompPageSize" -> createSizeMetric(DEBUG_LEVEL, "uncompressed page size"), + "nullCount" -> createSizeMetric(DEBUG_LEVEL, "null record count"), + "maxCPR" -> createAverageMetric(DEBUG_LEVEL, "max compression ratio"), + "minCPR" -> createAverageMetric(DEBUG_LEVEL, "min compression ratio"), + "maxFieldSize" -> createAverageMetric(DEBUG_LEVEL, "max size of single field"), + BUFFER_DATA_TIME -> createNanoTimingMetric(DEBUG_LEVEL, BUFFER_DATA_TIME), + BUFFER_META_TIME -> createNanoTimingMetric(DEBUG_LEVEL, BUFFER_META_TIME), + BUFFER_RESIZE_TIME -> createNanoTimingMetric(DEBUG_LEVEL, BUFFER_RESIZE_TIME)) case _ => Map.empty } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala index 2bec8bc581a..7fcf3dac7db 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala @@ -49,6 +49,9 @@ object MetricsLevel { object GpuMetric extends Logging { // Metric names. val BUFFER_TIME = "bufferTime" + val BUFFER_DATA_TIME = "bufferDataTime" + val BUFFER_META_TIME = "bufferMetaTime" + val BUFFER_RESIZE_TIME = "bufferResizeTime" val COPY_BUFFER_TIME = "copyBufferTime" val GPU_DECODE_TIME = "gpuDecodeTime" val NUM_INPUT_ROWS = "numInputRows" diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala index e4af2e4b6de..7fd48b91365 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala @@ -664,6 +664,10 @@ abstract class MultiFileCloudPartitionReaderBase( _ += (blockedTime * fileBufsAndMeta.getBufferTimePct).toLong } + // Update Parquet Meta as Metrics + fileBufsAndMeta.memBuffersAndSizes.foreach { hmbMeta => + hmbMeta.blockMeta.foreach(_.updateMetrics(metrics)) + } TrampolineUtil.incBytesRead(inputMetrics, fileBufsAndMeta.bytesRead) // if we replaced the path with Alluxio, set it to the original filesystem file // since Alluxio replacement is supposed to be transparent to the user @@ -756,6 +760,41 @@ trait DataBlockBase { def getReadDataSize: Long // the block size to be used to slice the whole HostMemoryBuffer def getBlockSize: Long + + case class ColumnStats(compressedSize: Long, + uncompressedSize: Long, + nullCount: Long, + minFieldSize: Int, + maxFieldSize: Int) + + protected def getColumnStatistics: Seq[ColumnStats] = Seq() + + def updateMetrics(metrics: Map[String, GpuMetric]): Unit = { + getColumnStatistics match { + case stats if stats.isEmpty => + case stats => + var minR: Double = 1 + var maxR: Double = 0 + var maxFieldSize: Int = 0 + stats.foreach { s => + metrics("compPageSize") += s.compressedSize + if (s.uncompressedSize > 0) { + metrics("unCompPageSize") += s.uncompressedSize + val r: Double = s.compressedSize.toDouble / s.uncompressedSize.toDouble + maxR = maxR max r + minR = minR min r + } + maxFieldSize = maxFieldSize max s.maxFieldSize + } + metrics("nullCount") += stats.head.nullCount + metrics("maxCPR").set(metrics("maxCPR").value max ((1.0 - minR) * 100000L).toLong) + if (metrics("minCPR").value == 0L) { + metrics("minCPR").set(100000L) + } + metrics("minCPR").set(metrics("minCPR").value min ((1.0 - maxR) * 100000L).toLong) + metrics("maxFieldSize").set(metrics("maxFieldSize").value max (maxFieldSize * 10L)) + } + } } /** @@ -1058,12 +1097,16 @@ abstract class MultiFileCoalescingPartitionReaderBase( if (currentChunkMeta.currentChunk.isEmpty) { CachedGpuBatchIterator(EmptyTableReader, colTypes) } else { + currentChunkMeta.currentChunk.foreach(_._2.updateMetrics(metrics)) + val (dataBuffer, dataSize) = readPartFiles(currentChunkMeta.currentChunk, currentChunkMeta.clippedSchema) + if (dataSize == 0) { dataBuffer.close() CachedGpuBatchIterator(EmptyTableReader, colTypes) } else { + startNewBufferRetry RmmRapidsRetryIterator.withRetry(dataBuffer, chunkedSplit(_)) { _ => // We don't want to actually close the host buffer until we know that we don't @@ -1110,7 +1153,11 @@ abstract class MultiFileCoalescingPartitionReaderBase( val batchContext = createBatchContext(filesAndBlocks, clippedSchema) // First, estimate the output file size for the initial allocating. // the estimated size should be >= size of HEAD + Blocks + FOOTER - val initTotalSize = calculateEstimatedBlocksOutputSize(batchContext) + val initTotalSize = + withResource(new NvtxWithMetrics("Buffer size eval", NvtxColor.ORANGE, + metrics("bufferMetaTime"))) { _ => + calculateEstimatedBlocksOutputSize(batchContext) + } val (buffer, bufferSize, footerOffset, outBlocks) = closeOnExcept(HostMemoryBuffer.allocate(initTotalSize)) { hmb => // Second, write header @@ -1129,15 +1176,21 @@ abstract class MultiFileCoalescingPartitionReaderBase( offset += fileBlockSize } - for (future <- tasks.asScala) { - val (blocks, bytesRead) = future.get() - allOutputBlocks ++= blocks - TrampolineUtil.incBytesRead(inputMetrics, bytesRead) + withResource(new NvtxWithMetrics("Buffer read data", NvtxColor.PURPLE, + metrics("bufferDataTime"))) { _ => + for (future <- tasks.asScala) { + val (blocks, bytesRead) = future.get() + allOutputBlocks ++= blocks + TrampolineUtil.incBytesRead(inputMetrics, bytesRead) + } } // Fourth, calculate the final buffer size - val finalBufferSize = calculateFinalBlocksOutputSize(offset, allOutputBlocks.toSeq, - batchContext) + val finalBufferSize = withResource(new NvtxWithMetrics("Buffer size eval", + NvtxColor.RED, metrics("bufferMetaTime"))) { _ => + calculateFinalBlocksOutputSize(offset, allOutputBlocks.toSeq, + batchContext) + } (hmb, finalBufferSize, offset, allOutputBlocks.toSeq) } @@ -1154,14 +1207,17 @@ abstract class MultiFileCoalescingPartitionReaderBase( s"reallocating and copying data to bigger buffer size: $bufferSize") } // Copy the old buffer to a new allocated bigger buffer and close the old buffer - buf = withResource(buffer) { _ => - withResource(new HostMemoryInputStream(buffer, footerOffset)) { in => - // realloc memory and copy - closeOnExcept(HostMemoryBuffer.allocate(bufferSize)) { newhmb => - withResource(new HostMemoryOutputStream(newhmb)) { out => - IOUtils.copy(in, out) + buf = withResource(new NvtxWithMetrics("Buffer resize time", + NvtxColor.RED, metrics("bufferResizeTime"))) { _ => + withResource(buffer) { _ => + withResource(new HostMemoryInputStream(buffer, footerOffset)) { in => + // realloc memory and copy + closeOnExcept(HostMemoryBuffer.allocate(bufferSize)) { newhmb => + withResource(new HostMemoryOutputStream(newhmb)) { out => + IOUtils.copy(in, out) + } + newhmb } - newhmb } } } @@ -1175,9 +1231,11 @@ abstract class MultiFileCoalescingPartitionReaderBase( // Closing the original buf and returning a new allocated buffer is allowed, but there is no // reason to do that. // If you have to do this, please think about to add other abstract methods first. - val (finalBuffer, finalBufferSize) = writeFileFooter(buf, totalBufferSize, footerOffset, - outBlocks, batchContext) - + val (finalBuffer, finalBufferSize) = withResource(new NvtxWithMetrics("Buffer write footer", + NvtxColor.WHITE, metrics("bufferMetaTime"))) { _ => + writeFileFooter(buf, totalBufferSize, footerOffset, + outBlocks, batchContext) + } closeOnExcept(finalBuffer) { _ => // triple check we didn't go over memory if (finalBufferSize > totalBufferSize) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index 207b6ddaa9b..d33808dd57e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -46,6 +46,7 @@ import org.apache.hadoop.fs.{FSDataInputStream, Path} import org.apache.parquet.bytes.BytesUtils import org.apache.parquet.bytes.BytesUtils.readIntLittleEndian import org.apache.parquet.column.ColumnDescriptor +import org.apache.parquet.column.statistics.{BinaryStatistics, BooleanStatistics, DoubleStatistics, FloatStatistics, IntStatistics, LongStatistics} import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.format.converter.ParquetMetadataConverter import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat} @@ -1596,6 +1597,7 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics val copyBuffer: Array[Byte] = new Array[Byte](copyBufferSize) withResource(filePath.getFileSystem(fileHadoopConf).open(filePath)) { in => coalescedRanges.foreach { blockCopy => + totalBytesCopied += copyDataRange(blockCopy, in, out, copyBuffer) } } @@ -1819,6 +1821,29 @@ private case class ParquetDataBlock(dataBlock: BlockMetaData) extends DataBlockB override def getRowCount: Long = dataBlock.getRowCount override def getReadDataSize: Long = dataBlock.getTotalByteSize override def getBlockSize: Long = dataBlock.getColumns.asScala.map(_.getTotalSize).sum + + override protected def getColumnStatistics: Seq[ColumnStats] = { + dataBlock.getColumns.asScala.map { c => + val (nullCnt, minField, maxField) = c.getStatistics match { + case s: BinaryStatistics => + (s.getNumNulls, s.genericGetMin.length, s.genericGetMax.length) + case s: BooleanStatistics => + (s.getNumNulls, 1, 1) + case s: IntStatistics => + (s.getNumNulls, 4, 4) + case s: LongStatistics => + (s.getNumNulls, 8, 8) + case s: FloatStatistics => + (s.getNumNulls, 4, 4) + case s: DoubleStatistics => + (s.getNumNulls, 8, 8) + case s => + throw new Exception(s"Invalid value $s") + } + ColumnStats(c.getTotalSize, c.getTotalUncompressedSize, + nullCnt, minField, maxField) + }.toSeq + } } /** Parquet extra information containing rebase modes and whether there is int96 timestamp */ diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala index 20a9482b70c..01e0b91bcd5 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala index 80acf2b0f9e..47507480b9f 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala @@ -429,7 +429,17 @@ case class GpuFileSourceScanExec( relation.fileFormat match { case _: GpuReadParquetFileFormat | _: GpuOrcFileFormat => Map(READ_FS_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_READ_FS_TIME), - WRITE_BUFFER_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_WRITE_BUFFER_TIME)) + WRITE_BUFFER_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_WRITE_BUFFER_TIME), + // For debugging ByteDance workloads + "compPageSize" -> createSizeMetric(DEBUG_LEVEL, "compressed page size"), + "unCompPageSize" -> createSizeMetric(DEBUG_LEVEL, "uncompressed page size"), + "nullCount" -> createSizeMetric(DEBUG_LEVEL, "null record count"), + "maxCPR" -> createAverageMetric(DEBUG_LEVEL, "max compression ratio"), + "minCPR" -> createAverageMetric(DEBUG_LEVEL, "min compression ratio"), + "maxFieldSize" -> createAverageMetric(DEBUG_LEVEL, "max size of single field"), + BUFFER_DATA_TIME -> createNanoTimingMetric(DEBUG_LEVEL, BUFFER_DATA_TIME), + BUFFER_META_TIME -> createNanoTimingMetric(DEBUG_LEVEL, BUFFER_META_TIME), + BUFFER_RESIZE_TIME -> createNanoTimingMetric(DEBUG_LEVEL, BUFFER_RESIZE_TIME)) case _ => Map.empty[String, GpuMetric] }