Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
Adding columnar RDD cache support
Browse files Browse the repository at this point in the history
Signed-off-by: Chendi Xue <[email protected]>
  • Loading branch information
xuechendi committed Apr 15, 2021
1 parent 9dbfc79 commit 8c75250
Show file tree
Hide file tree
Showing 8 changed files with 1,029 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,7 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] {
}
plan
case plan: InMemoryTableScanExec =>
if (plan.supportsColumnar) {
return false
}
plan
new ColumnarInMemoryTableScanExec(plan.attributes, plan.predicates, plan.relation)
case plan: ProjectExec =>
new ColumnarConditionProjectExec(null, plan.projectList, plan.child)
case plan: FilterExec =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive._
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.exchange._
import org.apache.spark.sql.execution.joins._
Expand Down Expand Up @@ -61,6 +62,9 @@ case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] {
case plan: BatchScanExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
new ColumnarBatchScanExec(plan.output, plan.scan)
case plan: InMemoryTableScanExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
new ColumnarInMemoryTableScanExec(plan.attributes, plan.predicates, plan.relation)
case plan: ProjectExec =>
val columnarChild = replaceWithColumnarPlan(plan.child)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.oap.execution

import org.apache.commons.lang3.StringUtils

import com.intel.oap.expression._
import com.intel.oap.vectorized.ArrowWritableColumnVector
import com.intel.oap.vectorized.CloseableColumnBatchIterator
import org.apache.arrow.memory.ArrowBuf
import org.apache.spark.TaskContext
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.columnar.{
CachedBatch,
CachedBatchSerializer,
SimpleMetricsCachedBatch,
SimpleMetricsCachedBatchSerializer
}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.vectorized.{WritableColumnVector}
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{LongAccumulator, Utils}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

/**
* The default implementation of CachedBatch.
*
* @param numRows The total number of rows in this batch
* @param buffers The buffers for serialized columns
* @param stats The stat of columns
*/
case class ArrowCachedBatch(numRows: Int, buffer: Array[Byte], stats: InternalRow = null)
extends SimpleMetricsCachedBatch {
override def sizeInBytes: Long = buffer.size
}

/**
* The default implementation of CachedBatchSerializer.
*/
class ArrowColumnarCachedBatchSerializer extends SimpleMetricsCachedBatchSerializer {
override def supportsColumnarInput(schema: Seq[Attribute]): Boolean = true

override def convertColumnarBatchToCachedBatch(
input: RDD[ColumnarBatch],
schema: Seq[Attribute],
storageLevel: StorageLevel,
conf: SQLConf): RDD[CachedBatch] = {
val batchSize = conf.columnBatchSize
val useCompression = conf.useCompression
convertForCacheInternal(input, schema, batchSize, useCompression)
}

override def convertInternalRowToCachedBatch(
input: RDD[InternalRow],
schema: Seq[Attribute],
storageLevel: StorageLevel,
conf: SQLConf): RDD[CachedBatch] =
throw new IllegalStateException("InternalRow input is not supported")

def convertForCacheInternal(
input: RDD[ColumnarBatch],
output: Seq[Attribute],
batchSize: Int,
useCompression: Boolean): RDD[CachedBatch] = {
input.mapPartitions { iter =>
var processed = false
new Iterator[ArrowCachedBatch] {
def next(): ArrowCachedBatch = {
processed = true
var _numRows: Int = 0
val _input = new ArrayBuffer[ColumnarBatch]()
while (iter.hasNext) {
val batch = iter.next
if (batch.numRows > 0) {
(0 until batch.numCols).foreach(i =>
batch.column(i).asInstanceOf[ArrowWritableColumnVector].retain())
_numRows += batch.numRows
_input += batch
}
}
val cacheBuffer = ConverterUtils.convertToNetty(_input.toArray)
_input.foreach(_.close)

ArrowCachedBatch(_numRows, cacheBuffer)
}

def hasNext: Boolean = !processed
}
}
}

override def convertCachedBatchToColumnarBatch(
input: RDD[CachedBatch],
cacheAttributes: Seq[Attribute],
selectedAttributes: Seq[Attribute],
conf: SQLConf): RDD[ColumnarBatch] = {
val columnIndices =
selectedAttributes.map(a => cacheAttributes.map(o => o.exprId).indexOf(a.exprId)).toArray
def createAndDecompressColumn(cachedIter: Iterator[CachedBatch]): Iterator[ColumnarBatch] = {
val res = new Iterator[ColumnarBatch] {
var iter: Iterator[ColumnarBatch] = null
if (cachedIter.hasNext) {
val cachedColumnarBatch: ArrowCachedBatch =
cachedIter.next.asInstanceOf[ArrowCachedBatch]
val rowCount = cachedColumnarBatch.numRows
val rawData = cachedColumnarBatch.buffer

iter = ConverterUtils.convertFromNetty(cacheAttributes, Array(rawData), columnIndices)
}
def next(): ColumnarBatch =
if (iter != null) {
iter.next
} else {
val resultStructType = StructType(selectedAttributes.map(a =>
StructField(a.name, a.dataType, a.nullable, a.metadata)))
val resultColumnVectors =
ArrowWritableColumnVector.allocateColumns(0, resultStructType).toArray
new ColumnarBatch(resultColumnVectors.map(_.asInstanceOf[ColumnVector]), 0)
}
def hasNext: Boolean = iter.hasNext
}
new CloseableColumnBatchIterator(res)
}
input.mapPartitions(createAndDecompressColumn)
}

override def convertCachedBatchToInternalRow(
input: RDD[CachedBatch],
cacheAttributes: Seq[Attribute],
selectedAttributes: Seq[Attribute],
conf: SQLConf): RDD[InternalRow] = {
// Find the ordinals and data types of the requested columns.
val columnarBatchRdd =
convertCachedBatchToColumnarBatch(input, cacheAttributes, selectedAttributes, conf)
columnarBatchRdd.mapPartitions { batches =>
val toUnsafe = UnsafeProjection.create(selectedAttributes, selectedAttributes)
batches.flatMap { batch => batch.rowIterator().asScala.map(toUnsafe) }
}
}

override def supportsColumnarOutput(schema: StructType): Boolean = true

override def vectorTypes(attributes: Seq[Attribute], conf: SQLConf): Option[Seq[String]] =
Option(Seq.fill(attributes.length)(classOf[ArrowWritableColumnVector].getName))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.oap.execution

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.columnar.CachedBatch
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, WholeStageCodegenExec}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.vectorized.ColumnarBatch

case class ColumnarInMemoryTableScanExec(
attributes: Seq[Attribute],
predicates: Seq[Expression],
@transient relation: InMemoryRelation)
extends LeafExecNode {

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

override val nodeName: String = {
relation.cacheBuilder.tableName match {
case Some(_) =>
"Scan " + relation.cacheBuilder.cachedName
case _ =>
super.nodeName
}
}

override def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren

override def doCanonicalize(): SparkPlan =
copy(
attributes = attributes.map(QueryPlan.normalizeExpressions(_, relation.output)),
predicates = predicates.map(QueryPlan.normalizeExpressions(_, relation.output)),
relation = relation.canonicalized.asInstanceOf[InMemoryRelation])

override def vectorTypes: Option[Seq[String]] =
relation.cacheBuilder.serializer.vectorTypes(attributes, conf)

/**
* If true, get data from ColumnVector in ColumnarBatch, which are generally faster.
* If false, get data from UnsafeRow build from CachedBatch
*/
override val supportsColumnar: Boolean = true

private lazy val columnarInputRDD: RDD[ColumnarBatch] = {
val numOutputRows = longMetric("numOutputRows")
val buffers = filteredCachedBatches()
relation.cacheBuilder.serializer
.convertCachedBatchToColumnarBatch(buffers, relation.output, attributes, conf)
.map { cb =>
numOutputRows += cb.numRows()
cb
}
}

private lazy val inputRDD: RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
// Using these variables here to avoid serialization of entire objects (if referenced
// directly) within the map Partitions closure.
val relOutput = relation.output
val serializer = relation.cacheBuilder.serializer

// update SQL metrics
val withMetrics =
filteredCachedBatches().mapPartitions { iter =>
iter.map { batch =>
numOutputRows += batch.numRows
batch
}
}
serializer.convertCachedBatchToInternalRow(withMetrics, relOutput, attributes, conf)
}

override def output: Seq[Attribute] = attributes

private def updateAttribute(expr: Expression): Expression = {
// attributes can be pruned so using relation's output.
// E.g., relation.output is [id, item] but this scan's output can be [item] only.
val attrMap = AttributeMap(relation.cachedPlan.output.zip(relation.output))
expr.transform {
case attr: Attribute => attrMap.getOrElse(attr, attr)
}
}

// The cached version does not change the outputPartitioning of the original SparkPlan.
// But the cached version could alias output, so we need to replace output.
override def outputPartitioning: Partitioning = {
relation.cachedPlan.outputPartitioning match {
case e: Expression => updateAttribute(e).asInstanceOf[Partitioning]
case other => other
}
}

// The cached version does not change the outputOrdering of the original SparkPlan.
// But the cached version could alias output, so we need to replace output.
override def outputOrdering: Seq[SortOrder] =
relation.cachedPlan.outputOrdering.map(updateAttribute(_).asInstanceOf[SortOrder])

// Accumulators used for testing purposes
lazy val readPartitions = sparkContext.longAccumulator
lazy val readBatches = sparkContext.longAccumulator

private def filteredCachedBatches(): RDD[CachedBatch] = {
relation.cacheBuilder.cachedColumnBuffers
}

protected override def doExecute(): RDD[InternalRow] = {
inputRDD
}

protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
columnarInputRDD
}
}
Loading

0 comments on commit 8c75250

Please sign in to comment.