Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-134] Update input metrics during reading (#135)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer authored Mar 4, 2021
1 parent e1e4b73 commit c4ff3bf
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,14 @@ import com.intel.oap.vectorized._
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.util._
import org.apache.spark.sql.connector.read.{
InputPartition,
PartitionReader,
PartitionReaderFactory
}
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
import org.apache.spark.sql.execution.datasources.v2.VectorizedFilePartitionReaderHandler
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetPartitionReaderFactory
import org.apache.spark.sql.util.OASPackageBridge._

class DataSourceRDDPartition(val index: Int, val inputPartition: InputPartition)
extends Partition
Expand Down Expand Up @@ -84,6 +81,8 @@ class ColumnarDataSourceRDD(
val rddId = this
SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => reader.close())
val iter = new Iterator[Any] {
private val inputMetrics = TaskContext.get().taskMetrics().inputMetrics

private[this] var valuePrepared = false

override def hasNext: Boolean = {
Expand All @@ -108,7 +107,21 @@ class ColumnarDataSourceRDD(
throw new java.util.NoSuchElementException("End of stream")
}
valuePrepared = false
reader.get()
val value = reader.get()
val bytes: Long = value match {
case batch: ColumnarBatch =>
(0 until batch.numCols()).map { i =>
val vector = Option(batch.column(i))
vector.map {
case av: ArrowWritableColumnVector =>
av.getValueVector.getBufferSize.toLong
case _ => 0L
}.sum
}.sum
case _ => 0L
}
inputMetrics.bridgeIncBytesRead(bytes)
value
}
}
val closeableColumnarBatchIterator = new CloseableColumnBatchIterator(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.util

import org.apache.spark.executor.InputMetrics

/**
* Bridge to package org.apache.spark.
*/
object OASPackageBridge {
implicit class InputMetricsWrapper(val m: InputMetrics) {
def bridgeIncBytesRead(v: Long): Unit = {
m.incBytesRead(v)
}
}
}

0 comments on commit c4ff3bf

Please sign in to comment.