From c4ff3bf30e385ed907883dde37ff452656b67f33 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 4 Mar 2021 08:14:27 +0800 Subject: [PATCH] [NSE-134] Update input metrics during reading (#135) --- .../oap/execution/ColumnarDataSourceRDD.scala | 27 +++++++++++----- .../spark/sql/util/OASPackageBridge.scala | 31 +++++++++++++++++++ 2 files changed, 51 insertions(+), 7 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/sql/util/OASPackageBridge.scala diff --git a/core/src/main/scala/com/intel/oap/execution/ColumnarDataSourceRDD.scala b/core/src/main/scala/com/intel/oap/execution/ColumnarDataSourceRDD.scala index e9e23021f..8b63fa897 100644 --- a/core/src/main/scala/com/intel/oap/execution/ColumnarDataSourceRDD.scala +++ b/core/src/main/scala/com/intel/oap/execution/ColumnarDataSourceRDD.scala @@ -22,17 +22,14 @@ import com.intel.oap.vectorized._ import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.util._ -import org.apache.spark.sql.connector.read.{ - InputPartition, - PartitionReader, - PartitionReaderFactory -} +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} +import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} import org.apache.spark.sql.execution.datasources.v2.VectorizedFilePartitionReaderHandler import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetPartitionReaderFactory +import org.apache.spark.sql.util.OASPackageBridge._ class DataSourceRDDPartition(val index: Int, val inputPartition: InputPartition) extends Partition @@ -84,6 +81,8 @@ class ColumnarDataSourceRDD( val rddId = this SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => reader.close()) val iter = new Iterator[Any] { + private val inputMetrics = TaskContext.get().taskMetrics().inputMetrics + private[this] var valuePrepared = false override def hasNext: Boolean = { @@ -108,7 +107,21 @@ class ColumnarDataSourceRDD( throw new java.util.NoSuchElementException("End of stream") } valuePrepared = false - reader.get() + val value = reader.get() + val bytes: Long = value match { + case batch: ColumnarBatch => + (0 until batch.numCols()).map { i => + val vector = Option(batch.column(i)) + vector.map { + case av: ArrowWritableColumnVector => + av.getValueVector.getBufferSize.toLong + case _ => 0L + }.sum + }.sum + case _ => 0L + } + inputMetrics.bridgeIncBytesRead(bytes) + value } } val closeableColumnarBatchIterator = new CloseableColumnBatchIterator( diff --git a/core/src/main/scala/org/apache/spark/sql/util/OASPackageBridge.scala b/core/src/main/scala/org/apache/spark/sql/util/OASPackageBridge.scala new file mode 100644 index 000000000..6828989e3 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/sql/util/OASPackageBridge.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.util + +import org.apache.spark.executor.InputMetrics + +/** + * Bridge to package org.apache.spark. + */ +object OASPackageBridge { + implicit class InputMetricsWrapper(val m: InputMetrics) { + def bridgeIncBytesRead(v: Long): Unit = { + m.incBytesRead(v) + } + } +}