Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-772] Code refactor for ColumnarBatchScan #805

Merged
merged 2 commits into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.oap.execution

import com.intel.oap.GazellePluginConfig
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Literal, _}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.connector.read.{Scan}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.vectorized.ColumnarBatch


class ColumnarBatchScanExec(output: Seq[AttributeReference], @transient scan: Scan,
runtimeFilters: Seq[Expression])
extends ColumnarBatchScanExecBase(output, scan, runtimeFilters) {
val tmpDir: String = GazellePluginConfig.getConf.tmpFile
override def supportsColumnar(): Boolean = true
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numInputBatches" -> SQLMetrics.createMetric(sparkContext, "input_batches"),
"numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "output_batches"),
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "totaltime_batchscan"),
"inputSize" -> SQLMetrics.createSizeMetric(sparkContext, "input size in bytes"))

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = longMetric("numOutputRows")
val numInputBatches = longMetric("numInputBatches")
val numOutputBatches = longMetric("numOutputBatches")
val scanTime = longMetric("scanTime")
val inputSize = longMetric("inputSize")
val inputColumnarRDD =
new ColumnarDataSourceRDD(sparkContext, partitions, readerFactory,
true, scanTime, numInputBatches, inputSize, tmpDir)
inputColumnarRDD.map { r =>
numOutputRows += r.numRows()
numOutputBatches += 1
r
}
}

override def doCanonicalize(): ColumnarBatchScanExec = {
if (runtimeFilters == null) {
// For spark3.1.
new ColumnarBatchScanExec(output.map(QueryPlan.normalizeExpressions(_, output)), scan, null)
} else {
// For spark3.2.
new ColumnarBatchScanExec(
output.map(QueryPlan.normalizeExpressions(_, output)), scan,
QueryPlan.normalizePredicates(
runtimeFilters.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral)),
output))
}
}

override def canEqual(other: Any): Boolean = other.isInstanceOf[ColumnarBatchScanExec]

override def equals(other: Any): Boolean = other match {
case that: ColumnarBatchScanExec =>
(that canEqual this) && super.equals(that)
case _ => false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import com.intel.oap.sql.shims.SparkShimLoader

import org.apache.spark.{MapOutputStatistics, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.BuildLeft
Expand All @@ -51,7 +50,6 @@ import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec, ColumnarArrowEvalPythonExec}
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized.ColumnarBatch

import org.apache.spark.util.ShufflePartitionUtils

Expand Down Expand Up @@ -90,26 +88,7 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] {
case plan: BatchScanExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
val runtimeFilters = SparkShimLoader.getSparkShims.getRuntimeFilters(plan)
new ColumnarBatchScanExec(plan.output, plan.scan, runtimeFilters) {
// This method is a commonly shared implementation for ColumnarBatchScanExec.
// We move it outside of shim layer to break the cyclic dependency caused by
// ColumnarDataSourceRDD.
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = longMetric("numOutputRows")
val numInputBatches = longMetric("numInputBatches")
val numOutputBatches = longMetric("numOutputBatches")
val scanTime = longMetric("scanTime")
val inputSize = longMetric("inputSize")
val inputColumnarRDD =
new ColumnarDataSourceRDD(sparkContext, partitions, readerFactory,
true, scanTime, numInputBatches, inputSize, tmpDir)
inputColumnarRDD.map { r =>
numOutputRows += r.numRows()
numOutputBatches += 1
r
}
}
}
new ColumnarBatchScanExec(plan.output, plan.scan, runtimeFilters)
case plan: CoalesceExec =>
ColumnarCoalesceExec(plan.numPartitions, replaceWithColumnarPlan(plan.child))
case plan: InMemoryTableScanExec =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.python.ArrowEvalPythonExec
import org.apache.spark.sql.execution.python.ColumnarArrowEvalPythonExec
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.vectorized.ColumnarBatch

case class RowGuard(child: SparkPlan) extends SparkPlan {
def output: Seq[Attribute] = child.output
Expand Down Expand Up @@ -81,26 +80,7 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] {
case plan: BatchScanExec =>
if (!enableColumnarBatchScan) return false
val runtimeFilters = SparkShimLoader.getSparkShims.getRuntimeFilters(plan)
new ColumnarBatchScanExec(plan.output, plan.scan, runtimeFilters) {
// This method is a commonly shared implementation for ColumnarBatchScanExec.
// We move it outside of shim layer to break the cyclic dependency caused by
// ColumnarDataSourceRDD.
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = longMetric("numOutputRows")
val numInputBatches = longMetric("numInputBatches")
val numOutputBatches = longMetric("numOutputBatches")
val scanTime = longMetric("scanTime")
val inputSize = longMetric("inputSize")
val inputColumnarRDD =
new ColumnarDataSourceRDD(sparkContext, partitions, readerFactory,
true, scanTime, numInputBatches, inputSize, tmpDir)
inputColumnarRDD.map { r =>
numOutputRows += r.numRows()
numOutputBatches += 1
r
}
}
}
new ColumnarBatchScanExec(plan.output, plan.scan, runtimeFilters)
case plan: FileSourceScanExec =>
if (plan.supportsColumnar) {
return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.BasePythonRunnerChild
import org.apache.spark.util.Utils

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.oap.execution

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.connector.read.Scan

/** For spark 3.1, the runtimeFilters: Seq[Expression] is not introduced in BatchScanExec.
*/
abstract class ColumnarBatchScanExecBase(output: Seq[AttributeReference], @transient scan: Scan,
runtimeFilters: Seq[Expression])
extends BatchScanExec(output, scan) {

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.intel.oap.sql.shims.spark311

import com.intel.oap.execution.ColumnarBatchScanExec
import com.intel.oap.spark.sql.ArrowWriteQueue
import com.intel.oap.sql.shims.{ShimDescriptor, SparkShims}
import java.io.File
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.oap.execution

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.connector.read.Scan

/**
* This class is used to adapt to spark3.2 BatchScanExec with runtimeFilters.
*/
abstract class ColumnarBatchScanExecBase(output: Seq[AttributeReference], @transient scan: Scan,
runtimeFilters: Seq[Expression])
extends BatchScanExec(output, scan, runtimeFilters) {

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.intel.oap.sql.shims.spark321

import com.intel.oap.execution.ColumnarBatchScanExec
import com.intel.oap.spark.sql.ArrowWriteQueue
import com.intel.oap.sql.shims.{ShimDescriptor, SparkShims}
import java.io.File
Expand Down