Skip to content

Commit

Permalink
KE-40433 add page index filter log (apache#619) (apache#624)
Browse files Browse the repository at this point in the history
* KE-40433 add page index filter log

* KE-40433 update parquet version
  • Loading branch information
ygjia authored May 11, 2023
1 parent ecf9e74 commit 19fef37
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ private[spark] object InternalAccumulator {
val TOTAL_BLOOM_BLOCKS = INPUT_METRICS_PREFIX + "totalBloomBlocks"
val FOOTER_READ_TIME = INPUT_METRICS_PREFIX + "footerReadTime"
val FOOTER_READ_NUMBER = INPUT_METRICS_PREFIX + "footerReadNumber"
val TOTAL_PARQUET_PAGES = INPUT_METRICS_PREFIX + "totalPagesCount"
val FILTERED_PARQUET_PAGES = INPUT_METRICS_PREFIX + "filteredPagesCount"
val AFTER_FILTERED_PARQUET_PAGES = INPUT_METRICS_PREFIX + "afterFilterPagesCount"
}

// scalastyle:on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ class InputMetrics private[spark] () extends Serializable {
private[executor] val _totalBloomBlocks = new LongAccumulator
private[executor] val _footerReadTime = new LongAccumulator
private[executor] val _footerReadNumber = new LongAccumulator
private[executor] val _totalPagesCount = new LongAccumulator
private[executor] val _filteredPagesCount = new LongAccumulator
private[executor] val _afterFilterPagesCount = new LongAccumulator

/**
* Total number of bytes read.
Expand All @@ -67,13 +70,19 @@ class InputMetrics private[spark] () extends Serializable {
def footerReadTime: Long = _footerReadTime.sum

def footerReadNumber: Long = _footerReadNumber.sum
def totalPagesCount: Long = _totalPagesCount.sum
def filteredPagesCount: Long = _filteredPagesCount.sum
def afterFilterPagesCount: Long = _afterFilterPagesCount.sum

private[spark] def incBytesRead(v: Long): Unit = _bytesRead.add(v)
private[spark] def incSkipBloomBlocks(v: Long): Unit = _skipBloomBlocks.add(v)
private[spark] def incSkipRows(v: Long): Unit = _skipBloomRows.add(v)
private[spark] def incTotalBloomBlocks(v: Long): Unit = _totalBloomBlocks.add(v)
private[spark] def incFooterReadTime(v: Long): Unit = _footerReadTime.add(v)
private[spark] def incFooterReadNumber(v: Long): Unit = _footerReadNumber.add(v)
private[spark] def incTotalPagesCount(v: Long): Unit = _totalPagesCount.add(v)
private[spark] def incFilteredPagesCount(v: Long): Unit = _filteredPagesCount.add(v)
private[spark] def incAfterFilterPagesCount(v: Long): Unit = _afterFilterPagesCount.add(v)
private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v)
private[spark] def setBytesRead(v: Long): Unit = _bytesRead.setValue(v)
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ class TaskMetrics private[spark] () extends Serializable {
input.SKIP_BLOOM_ROWS -> inputMetrics._skipBloomRows,
input.FOOTER_READ_TIME -> inputMetrics._footerReadTime,
input.FOOTER_READ_NUMBER -> inputMetrics._footerReadNumber,
input.TOTAL_PARQUET_PAGES -> inputMetrics._totalPagesCount,
input.FILTERED_PARQUET_PAGES -> inputMetrics._filteredPagesCount,
input.AFTER_FILTERED_PARQUET_PAGES -> inputMetrics._afterFilterPagesCount,

output.BYTES_WRITTEN -> outputMetrics._bytesWritten,
output.RECORDS_WRITTEN -> outputMetrics._recordsWritten
) ++ testAccum.map(TEST_ACCUM -> _)
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@
<kafka.version>2.8.2</kafka.version>
<!-- After 10.15.1.3, the minimum required version is JDK9 -->
<derby.version>10.14.2.0</derby.version>
<parquet.version>1.12.2-kylin-r3</parquet.version>
<parquet.version>1.12.2-kylin-r5</parquet.version>
<orc.version>1.6.11</orc.version>
<jetty.version>9.4.49.v20220914</jetty.version>
<mortbay.jetty.version>7.0.0.pre5</mortbay.jetty.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.filter2.compat.QueryMetrics;
import org.apache.parquet.schema.Type;

import org.apache.spark.memory.MemoryMode;
Expand Down Expand Up @@ -341,4 +342,11 @@ private void checkEndOfRowGroup() throws IOException {
}
totalCountLoadedSoFar += pages.getRowCount();
}
public QueryMetrics getParquetQueryMetrics() {
if(reader != null){
return reader.queryMetrics;
}else {
return new QueryMetrics();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,20 @@ class FileScanRDD(
}
}

def collectQueryMetrics() = {
if (currentIterator != null && currentIterator.isInstanceOf[RecordReaderIterator[Object]]) {
val queryMetrics = currentIterator.asInstanceOf[RecordReaderIterator[Object]]
.getParquetQueryMetrics()
if (queryMetrics.getTotalPagesCount > 0) {
inputMetrics.incTotalPagesCount(queryMetrics.getTotalPagesCount);
inputMetrics.incFilteredPagesCount(queryMetrics.getFilteredPagesCount);
inputMetrics.incAfterFilterPagesCount(queryMetrics.getAfterFilterPagesCount);
}
}
}

override def close(): Unit = {
collectQueryMetrics()
incTaskInputMetricsBytesRead()
InputFileBlockHolder.unset()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ package org.apache.spark.sql.execution.datasources
import java.io.Closeable

import org.apache.hadoop.mapreduce.RecordReader
import org.apache.parquet.filter2.compat.QueryMetrics

import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader

/**
* An adaptor from a Hadoop [[RecordReader]] to an [[Iterator]] over the values returned.
Expand All @@ -33,6 +35,7 @@ class RecordReaderIterator[T](
private[this] var rowReader: RecordReader[_, T]) extends Iterator[T] with Closeable {
private[this] var havePair = false
private[this] var finished = false
private[this] var queryMetrics: QueryMetrics = _

override def hasNext: Boolean = {
if (!finished && !havePair) {
Expand All @@ -58,11 +61,25 @@ class RecordReaderIterator[T](

override def close(): Unit = {
if (rowReader != null) {
setParquetQueryMetrics()
try {
rowReader.close()
} finally {
rowReader = null
}
}
}

private def setParquetQueryMetrics(): Unit = {
if (rowReader.isInstanceOf[VectorizedParquetRecordReader]) {
queryMetrics = rowReader.asInstanceOf[VectorizedParquetRecordReader].getParquetQueryMetrics()
}
}

def getParquetQueryMetrics(): QueryMetrics = {
if (queryMetrics == null) {
queryMetrics = new QueryMetrics
}
queryMetrics
}
}

0 comments on commit 19fef37

Please sign in to comment.