Skip to content

Commit

Permalink
cherry-pick spark3.2 KE commit
Browse files Browse the repository at this point in the history
KE-40433 add page index filter log (apache#619) (apache#624)

* KE-40433 add page index filter log

* KE-40433 update parquet version
  • Loading branch information
ygjia authored and zheniantoushipashi committed Aug 21, 2023
1 parent baf939f commit 0e4f271
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ private[spark] object InternalAccumulator {
object input {
val BYTES_READ = INPUT_METRICS_PREFIX + "bytesRead"
val RECORDS_READ = INPUT_METRICS_PREFIX + "recordsRead"
val SKIP_BLOOM_BLOCKS = INPUT_METRICS_PREFIX + "skipBloomBlocks"
val SKIP_BLOOM_ROWS = INPUT_METRICS_PREFIX + "skipBloomRows"
val TOTAL_BLOOM_BLOCKS = INPUT_METRICS_PREFIX + "totalBloomBlocks"
val FOOTER_READ_TIME = INPUT_METRICS_PREFIX + "footerReadTime"
val FOOTER_READ_NUMBER = INPUT_METRICS_PREFIX + "footerReadNumber"
val TOTAL_PARQUET_PAGES = INPUT_METRICS_PREFIX + "totalPagesCount"
val FILTERED_PARQUET_PAGES = INPUT_METRICS_PREFIX + "filteredPagesCount"
val AFTER_FILTERED_PARQUET_PAGES = INPUT_METRICS_PREFIX + "afterFilterPagesCount"
}

// scalastyle:on
Expand Down
29 changes: 29 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ object DataReadMethod extends Enumeration with Serializable {
class InputMetrics private[spark] () extends Serializable {
private[executor] val _bytesRead = new LongAccumulator
private[executor] val _recordsRead = new LongAccumulator
private[executor] val _skipBloomBlocks = new LongAccumulator
private[executor] val _skipBloomRows = new LongAccumulator
private[executor] val _totalBloomBlocks = new LongAccumulator
private[executor] val _footerReadTime = new LongAccumulator
private[executor] val _footerReadNumber = new LongAccumulator
private[executor] val _totalPagesCount = new LongAccumulator
private[executor] val _filteredPagesCount = new LongAccumulator
private[executor] val _afterFilterPagesCount = new LongAccumulator

/**
* Total number of bytes read.
Expand All @@ -53,7 +61,28 @@ class InputMetrics private[spark] () extends Serializable {
*/
def recordsRead: Long = _recordsRead.sum

def totalBloomBlocks: Long = _totalBloomBlocks.sum

def totalSkipBloomBlocks: Long = _skipBloomBlocks.sum

def totalSkipBloomRows: Long = _skipBloomRows.sum

def footerReadTime: Long = _footerReadTime.sum

def footerReadNumber: Long = _footerReadNumber.sum
def totalPagesCount: Long = _totalPagesCount.sum
def filteredPagesCount: Long = _filteredPagesCount.sum
def afterFilterPagesCount: Long = _afterFilterPagesCount.sum

private[spark] def incBytesRead(v: Long): Unit = _bytesRead.add(v)
private[spark] def incSkipBloomBlocks(v: Long): Unit = _skipBloomBlocks.add(v)
private[spark] def incSkipRows(v: Long): Unit = _skipBloomRows.add(v)
private[spark] def incTotalBloomBlocks(v: Long): Unit = _totalBloomBlocks.add(v)
private[spark] def incFooterReadTime(v: Long): Unit = _footerReadTime.add(v)
private[spark] def incFooterReadNumber(v: Long): Unit = _footerReadNumber.add(v)
private[spark] def incTotalPagesCount(v: Long): Unit = _totalPagesCount.add(v)
private[spark] def incFilteredPagesCount(v: Long): Unit = _filteredPagesCount.add(v)
private[spark] def incAfterFilterPagesCount(v: Long): Unit = _afterFilterPagesCount.add(v)
private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v)
private[spark] def setBytesRead(v: Long): Unit = _bytesRead.setValue(v)
}
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,15 @@ class TaskMetrics private[spark] () extends Serializable {
shuffleWrite.WRITE_TIME -> shuffleWriteMetrics._writeTime,
input.BYTES_READ -> inputMetrics._bytesRead,
input.RECORDS_READ -> inputMetrics._recordsRead,
input.TOTAL_BLOOM_BLOCKS -> inputMetrics._totalBloomBlocks,
input.SKIP_BLOOM_BLOCKS -> inputMetrics._skipBloomBlocks,
input.SKIP_BLOOM_ROWS -> inputMetrics._skipBloomRows,
input.FOOTER_READ_TIME -> inputMetrics._footerReadTime,
input.FOOTER_READ_NUMBER -> inputMetrics._footerReadNumber,
input.TOTAL_PARQUET_PAGES -> inputMetrics._totalPagesCount,
input.FILTERED_PARQUET_PAGES -> inputMetrics._filteredPagesCount,
input.AFTER_FILTERED_PARQUET_PAGES -> inputMetrics._afterFilterPagesCount,

output.BYTES_WRITTEN -> outputMetrics._bytesWritten,
output.RECORDS_WRITTEN -> outputMetrics._recordsWritten
) ++ testAccum.map(TEST_ACCUM -> _)
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@
<kafka.version>2.8.2</kafka.version>
<!-- After 10.15.1.3, the minimum required version is JDK9 -->
<derby.version>10.14.2.0</derby.version>
<parquet.version>1.12.2-kylin-r1</parquet.version>
<parquet.version>1.12.2-kylin-r5</parquet.version>
<orc.version>1.6.11</orc.version>
<jetty.version>9.4.49.v20220914</jetty.version>
<jakartaservlet.version>4.0.3</jakartaservlet.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,21 @@ interface ParquetRowGroupReader extends Closeable {
* Reads the next row group from this reader. Returns null if there is no more row group.
*/
PageReadStore readNextRowGroup() throws IOException;

/**
* Returns the underlying file reader.
*/
ParquetFileReader getFileReader();
}

private static class ParquetRowGroupReaderImpl implements ParquetRowGroupReader {
private final ParquetFileReader reader;

@Override
public ParquetFileReader getFileReader() {
return reader;
}

ParquetRowGroupReaderImpl(ParquetFileReader reader) {
this.reader = reader;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.filter2.compat.QueryMetrics;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
Expand Down Expand Up @@ -396,6 +397,14 @@ private void checkEndOfRowGroup() throws IOException {
totalCountLoadedSoFar += pages.getRowCount();
}

public QueryMetrics getParquetQueryMetrics() {
if(reader != null){
return reader.getFileReader().queryMetrics;
}else {
return new QueryMetrics();
}
}

private void initColumnReader(PageReadStore pages, ParquetColumnVector cv) throws IOException {
if (!missingColumns.contains(cv.getColumn())) {
if (cv.getColumn().isPrimitive()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,20 @@ class FileScanRDD(
}
}

def collectQueryMetrics() = {
if (currentIterator != null && currentIterator.isInstanceOf[RecordReaderIterator[Object]]) {
val queryMetrics = currentIterator.asInstanceOf[RecordReaderIterator[Object]]
.getParquetQueryMetrics()
if (queryMetrics.getTotalPagesCount > 0) {
inputMetrics.incTotalPagesCount(queryMetrics.getTotalPagesCount);
inputMetrics.incFilteredPagesCount(queryMetrics.getFilteredPagesCount);
inputMetrics.incAfterFilterPagesCount(queryMetrics.getAfterFilterPagesCount);
}
}
}

override def close(): Unit = {
collectQueryMetrics()
incTaskInputMetricsBytesRead()
InputFileBlockHolder.unset()
resetCurrentIterator()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ package org.apache.spark.sql.execution.datasources
import java.io.Closeable

import org.apache.hadoop.mapreduce.RecordReader
import org.apache.parquet.filter2.compat.QueryMetrics

import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader

/**
* An adaptor from a Hadoop [[RecordReader]] to an [[Iterator]] over the values returned.
Expand All @@ -33,6 +35,7 @@ class RecordReaderIterator[T](
private[this] var rowReader: RecordReader[_, T]) extends Iterator[T] with Closeable {
private[this] var havePair = false
private[this] var finished = false
private[this] var queryMetrics: QueryMetrics = _

override def hasNext: Boolean = {
if (!finished && !havePair) {
Expand Down Expand Up @@ -65,11 +68,25 @@ class RecordReaderIterator[T](

override def close(): Unit = {
if (rowReader != null) {
setParquetQueryMetrics()
try {
rowReader.close()
} finally {
rowReader = null
}
}
}

private def setParquetQueryMetrics(): Unit = {
if (rowReader.isInstanceOf[VectorizedParquetRecordReader]) {
queryMetrics = rowReader.asInstanceOf[VectorizedParquetRecordReader].getParquetQueryMetrics()
}
}

def getParquetQueryMetrics(): QueryMetrics = {
if (queryMetrics == null) {
queryMetrics = new QueryMetrics
}
queryMetrics
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,17 @@
package org.apache.spark.sql.execution.datasources.parquet

import java.util.{Optional, PrimitiveIterator}

import scala.collection.mutable.ArrayBuffer
import scala.language.implicitConversions

import org.apache.parquet.column.{ColumnDescriptor, ParquetProperties}
import org.apache.parquet.column.impl.ColumnWriteStoreV1
import org.apache.parquet.column.page._
import org.apache.parquet.column.page.mem.MemPageStore
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.io.ParquetDecodingException
import org.apache.parquet.io.api.Binary
import org.apache.parquet.schema.{MessageType, MessageTypeParser}
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName

import org.apache.spark.memory.MemoryMode
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -689,6 +687,8 @@ class ParquetVectorizedSuite extends QueryTest with ParquetTest with SharedSpark
}

override def close(): Unit = {}

override def getFileReader: ParquetFileReader = null
}

private case class TestPageReadStore(
Expand Down

0 comments on commit 0e4f271

Please sign in to comment.