diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala index fadbdfa32fd09..7c70e41911055 100644 --- a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala +++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala @@ -78,6 +78,9 @@ private[spark] object InternalAccumulator { val TOTAL_BLOOM_BLOCKS = INPUT_METRICS_PREFIX + "totalBloomBlocks" val FOOTER_READ_TIME = INPUT_METRICS_PREFIX + "footerReadTime" val FOOTER_READ_NUMBER = INPUT_METRICS_PREFIX + "footerReadNumber" + val TOTAL_PARQUET_PAGES = INPUT_METRICS_PREFIX + "totalPagesCount" + val FILTERED_PARQUET_PAGES = INPUT_METRICS_PREFIX + "filteredPagesCount" + val AFTER_FILTERED_PARQUET_PAGES = INPUT_METRICS_PREFIX + "afterFilterPagesCount" } // scalastyle:on diff --git a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala index 3e3905df0d4cb..33ed4ffcfb7fc 100644 --- a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala @@ -47,6 +47,9 @@ class InputMetrics private[spark] () extends Serializable { private[executor] val _totalBloomBlocks = new LongAccumulator private[executor] val _footerReadTime = new LongAccumulator private[executor] val _footerReadNumber = new LongAccumulator + private[executor] val _totalPagesCount = new LongAccumulator + private[executor] val _filteredPagesCount = new LongAccumulator + private[executor] val _afterFilterPagesCount = new LongAccumulator /** * Total number of bytes read. @@ -67,6 +70,9 @@ class InputMetrics private[spark] () extends Serializable { def footerReadTime: Long = _footerReadTime.sum def footerReadNumber: Long = _footerReadNumber.sum + def totalPagesCount: Long = _totalPagesCount.sum + def filteredPagesCount: Long = _filteredPagesCount.sum + def afterFilterPagesCount: Long = _afterFilterPagesCount.sum private[spark] def incBytesRead(v: Long): Unit = _bytesRead.add(v) private[spark] def incSkipBloomBlocks(v: Long): Unit = _skipBloomBlocks.add(v) @@ -74,6 +80,9 @@ class InputMetrics private[spark] () extends Serializable { private[spark] def incTotalBloomBlocks(v: Long): Unit = _totalBloomBlocks.add(v) private[spark] def incFooterReadTime(v: Long): Unit = _footerReadTime.add(v) private[spark] def incFooterReadNumber(v: Long): Unit = _footerReadNumber.add(v) + private[spark] def incTotalPagesCount(v: Long): Unit = _totalPagesCount.add(v) + private[spark] def incFilteredPagesCount(v: Long): Unit = _filteredPagesCount.add(v) + private[spark] def incAfterFilterPagesCount(v: Long): Unit = _afterFilterPagesCount.add(v) private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v) private[spark] def setBytesRead(v: Long): Unit = _bytesRead.setValue(v) } diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 1a8809c3d584a..ac3cd91ed0944 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -237,6 +237,10 @@ class TaskMetrics private[spark] () extends Serializable { input.SKIP_BLOOM_ROWS -> inputMetrics._skipBloomRows, input.FOOTER_READ_TIME -> inputMetrics._footerReadTime, input.FOOTER_READ_NUMBER -> inputMetrics._footerReadNumber, + input.TOTAL_PARQUET_PAGES -> inputMetrics._totalPagesCount, + input.FILTERED_PARQUET_PAGES -> inputMetrics._filteredPagesCount, + input.AFTER_FILTERED_PARQUET_PAGES -> inputMetrics._afterFilterPagesCount, + output.BYTES_WRITTEN -> outputMetrics._bytesWritten, output.RECORDS_WRITTEN -> outputMetrics._recordsWritten ) ++ testAccum.map(TEST_ACCUM -> _) diff --git a/pom.xml b/pom.xml index 53c8661ee88c6..4e835de272288 100644 --- a/pom.xml +++ b/pom.xml @@ -138,7 +138,7 @@ 2.8.2 10.14.2.0 - 1.12.2-kylin-r3 + 1.12.2-kylin-r5 1.6.11 9.4.49.v20220914 7.0.0.pre5 diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index 9f7836ae4818d..6dfbb8fc1f38a 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.QueryMetrics; import org.apache.parquet.schema.Type; import org.apache.spark.memory.MemoryMode; @@ -341,4 +342,11 @@ private void checkEndOfRowGroup() throws IOException { } totalCountLoadedSoFar += pages.getRowCount(); } + public QueryMetrics getParquetQueryMetrics() { + if(reader != null){ + return reader.queryMetrics; + }else { + return new QueryMetrics(); + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 99faef0864313..5fb60eba92fa5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -211,7 +211,20 @@ class FileScanRDD( } } + def collectQueryMetrics() = { + if (currentIterator != null && currentIterator.isInstanceOf[RecordReaderIterator[Object]]) { + val queryMetrics = currentIterator.asInstanceOf[RecordReaderIterator[Object]] + .getParquetQueryMetrics() + if (queryMetrics.getTotalPagesCount > 0) { + inputMetrics.incTotalPagesCount(queryMetrics.getTotalPagesCount); + inputMetrics.incFilteredPagesCount(queryMetrics.getFilteredPagesCount); + inputMetrics.incAfterFilterPagesCount(queryMetrics.getAfterFilterPagesCount); + } + } + } + override def close(): Unit = { + collectQueryMetrics() incTaskInputMetricsBytesRead() InputFileBlockHolder.unset() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala index d8e30e600098d..a916c73e158ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala @@ -20,8 +20,10 @@ package org.apache.spark.sql.execution.datasources import java.io.Closeable import org.apache.hadoop.mapreduce.RecordReader +import org.apache.parquet.filter2.compat.QueryMetrics import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader /** * An adaptor from a Hadoop [[RecordReader]] to an [[Iterator]] over the values returned. @@ -33,6 +35,7 @@ class RecordReaderIterator[T]( private[this] var rowReader: RecordReader[_, T]) extends Iterator[T] with Closeable { private[this] var havePair = false private[this] var finished = false + private[this] var queryMetrics: QueryMetrics = _ override def hasNext: Boolean = { if (!finished && !havePair) { @@ -58,6 +61,7 @@ class RecordReaderIterator[T]( override def close(): Unit = { if (rowReader != null) { + setParquetQueryMetrics() try { rowReader.close() } finally { @@ -65,4 +69,17 @@ class RecordReaderIterator[T]( } } } + + private def setParquetQueryMetrics(): Unit = { + if (rowReader.isInstanceOf[VectorizedParquetRecordReader]) { + queryMetrics = rowReader.asInstanceOf[VectorizedParquetRecordReader].getParquetQueryMetrics() + } + } + + def getParquetQueryMetrics(): QueryMetrics = { + if (queryMetrics == null) { + queryMetrics = new QueryMetrics + } + queryMetrics + } }