Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-772] Code refactor for ColumnarBatchScan (#805)
Browse files Browse the repository at this point in the history
* Override doCanonicalize in ColumnarBatchScanExec

* Refactor ColumnarBatchScan
  • Loading branch information
PHILO-HE authored Mar 29, 2022
1 parent d7c4cbd commit f40940c
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 151 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.oap.execution

import com.intel.oap.GazellePluginConfig
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Literal, _}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.connector.read.{Scan}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.vectorized.ColumnarBatch


class ColumnarBatchScanExec(output: Seq[AttributeReference], @transient scan: Scan,
runtimeFilters: Seq[Expression])
extends ColumnarBatchScanExecBase(output, scan, runtimeFilters) {
val tmpDir: String = GazellePluginConfig.getConf.tmpFile
override def supportsColumnar(): Boolean = true
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numInputBatches" -> SQLMetrics.createMetric(sparkContext, "input_batches"),
"numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "output_batches"),
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "totaltime_batchscan"),
"inputSize" -> SQLMetrics.createSizeMetric(sparkContext, "input size in bytes"))

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = longMetric("numOutputRows")
val numInputBatches = longMetric("numInputBatches")
val numOutputBatches = longMetric("numOutputBatches")
val scanTime = longMetric("scanTime")
val inputSize = longMetric("inputSize")
val inputColumnarRDD =
new ColumnarDataSourceRDD(sparkContext, partitions, readerFactory,
true, scanTime, numInputBatches, inputSize, tmpDir)
inputColumnarRDD.map { r =>
numOutputRows += r.numRows()
numOutputBatches += 1
r
}
}

override def doCanonicalize(): ColumnarBatchScanExec = {
if (runtimeFilters == null) {
// For spark3.1.
new ColumnarBatchScanExec(output.map(QueryPlan.normalizeExpressions(_, output)), scan, null)
} else {
// For spark3.2.
new ColumnarBatchScanExec(
output.map(QueryPlan.normalizeExpressions(_, output)), scan,
QueryPlan.normalizePredicates(
runtimeFilters.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral)),
output))
}
}

override def canEqual(other: Any): Boolean = other.isInstanceOf[ColumnarBatchScanExec]

override def equals(other: Any): Boolean = other match {
case that: ColumnarBatchScanExec =>
(that canEqual this) && super.equals(that)
case _ => false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import com.intel.oap.sql.shims.SparkShimLoader

import org.apache.spark.{MapOutputStatistics, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.BuildLeft
Expand All @@ -51,7 +50,6 @@ import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec, ColumnarArrowEvalPythonExec}
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized.ColumnarBatch

import org.apache.spark.util.ShufflePartitionUtils

Expand Down Expand Up @@ -90,26 +88,7 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] {
case plan: BatchScanExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
val runtimeFilters = SparkShimLoader.getSparkShims.getRuntimeFilters(plan)
new ColumnarBatchScanExec(plan.output, plan.scan, runtimeFilters) {
// This method is a commonly shared implementation for ColumnarBatchScanExec.
// We move it outside of shim layer to break the cyclic dependency caused by
// ColumnarDataSourceRDD.
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = longMetric("numOutputRows")
val numInputBatches = longMetric("numInputBatches")
val numOutputBatches = longMetric("numOutputBatches")
val scanTime = longMetric("scanTime")
val inputSize = longMetric("inputSize")
val inputColumnarRDD =
new ColumnarDataSourceRDD(sparkContext, partitions, readerFactory,
true, scanTime, numInputBatches, inputSize, tmpDir)
inputColumnarRDD.map { r =>
numOutputRows += r.numRows()
numOutputBatches += 1
r
}
}
}
new ColumnarBatchScanExec(plan.output, plan.scan, runtimeFilters)
case plan: CoalesceExec =>
ColumnarCoalesceExec(plan.numPartitions, replaceWithColumnarPlan(plan.child))
case plan: InMemoryTableScanExec =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.python.ArrowEvalPythonExec
import org.apache.spark.sql.execution.python.ColumnarArrowEvalPythonExec
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.vectorized.ColumnarBatch

case class RowGuard(child: SparkPlan) extends SparkPlan {
def output: Seq[Attribute] = child.output
Expand Down Expand Up @@ -81,26 +80,7 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] {
case plan: BatchScanExec =>
if (!enableColumnarBatchScan) return false
val runtimeFilters = SparkShimLoader.getSparkShims.getRuntimeFilters(plan)
new ColumnarBatchScanExec(plan.output, plan.scan, runtimeFilters) {
// This method is a commonly shared implementation for ColumnarBatchScanExec.
// We move it outside of shim layer to break the cyclic dependency caused by
// ColumnarDataSourceRDD.
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = longMetric("numOutputRows")
val numInputBatches = longMetric("numInputBatches")
val numOutputBatches = longMetric("numOutputBatches")
val scanTime = longMetric("scanTime")
val inputSize = longMetric("inputSize")
val inputColumnarRDD =
new ColumnarDataSourceRDD(sparkContext, partitions, readerFactory,
true, scanTime, numInputBatches, inputSize, tmpDir)
inputColumnarRDD.map { r =>
numOutputRows += r.numRows()
numOutputBatches += 1
r
}
}
}
new ColumnarBatchScanExec(plan.output, plan.scan, runtimeFilters)
case plan: FileSourceScanExec =>
if (plan.supportsColumnar) {
return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.BasePythonRunnerChild
import org.apache.spark.util.Utils

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.oap.execution

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.connector.read.Scan

/** For spark 3.1, the runtimeFilters: Seq[Expression] is not introduced in BatchScanExec.
*/
abstract class ColumnarBatchScanExecBase(output: Seq[AttributeReference], @transient scan: Scan,
runtimeFilters: Seq[Expression])
extends BatchScanExec(output, scan) {

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.intel.oap.sql.shims.spark311

import com.intel.oap.execution.ColumnarBatchScanExec
import com.intel.oap.spark.sql.ArrowWriteQueue
import com.intel.oap.sql.shims.{ShimDescriptor, SparkShims}
import java.io.File
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.oap.execution

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.connector.read.Scan

/**
* This class is used to adapt to spark3.2 BatchScanExec with runtimeFilters.
*/
abstract class ColumnarBatchScanExecBase(output: Seq[AttributeReference], @transient scan: Scan,
runtimeFilters: Seq[Expression])
extends BatchScanExec(output, scan, runtimeFilters) {

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.intel.oap.sql.shims.spark321

import com.intel.oap.execution.ColumnarBatchScanExec
import com.intel.oap.spark.sql.ArrowWriteQueue
import com.intel.oap.sql.shims.{ShimDescriptor, SparkShims}
import java.io.File
Expand Down

0 comments on commit f40940c

Please sign in to comment.