Skip to content

Commit

Permalink
[NSE-196] clean up native sql options (oap-project#215)
Browse files Browse the repository at this point in the history
* clean up native sql options

Signed-off-by: Yuan Zhou <[email protected]>

* adding more options

Signed-off-by: Yuan Zhou <[email protected]>

* adding more options

Signed-off-by: Yuan Zhou <[email protected]>

* adding warning log for running on non-intel cpu

Signed-off-by: Yuan Zhou <[email protected]>
  • Loading branch information
zhouyuan committed Apr 15, 2021
1 parent c2205a8 commit d8a4d4d
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,20 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] {
val enableColumnarSort = columnarConf.enableColumnarSort
val enableColumnarWindow = columnarConf.enableColumnarWindow
val enableColumnarSortMergeJoin = columnarConf.enableColumnarSortMergeJoin
val enableColumnarBatchScan = columnarConf.enableColumnarBatchScan
val enableColumnarProjFilter = columnarConf.enableColumnarProjFilter
val enableColumnarHashAgg = columnarConf.enableColumnarHashAgg
val enableColumnarUnion = columnarConf.enableColumnarUnion
val enableColumnarExpand = columnarConf.enableColumnarExpand
val enableColumnarShuffledHashJoin = columnarConf.enableColumnarShuffledHashJoin
val enableColumnarBroadcastExchange = columnarConf.enableColumnarBroadcastExchange
val enableColumnarBroadcastJoin = columnarConf.enableColumnarBroadcastJoin

private def tryConvertToColumnar(plan: SparkPlan): Boolean = {
try {
val columnarPlan = plan match {
case plan: BatchScanExec =>
if (!enableColumnarBatchScan) return false
new ColumnarBatchScanExec(plan.output, plan.scan)
case plan: FileSourceScanExec =>
if (plan.supportsColumnar) {
Expand All @@ -73,10 +82,13 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] {
}
plan
case plan: ProjectExec =>
if(!enableColumnarProjFilter) return false
new ColumnarConditionProjectExec(null, plan.projectList, plan.child)
case plan: FilterExec =>
if (!enableColumnarProjFilter) return false
new ColumnarConditionProjectExec(plan.condition, null, plan.child)
case plan: HashAggregateExec =>
if (!enableColumnarHashAgg) return false
new ColumnarHashAggregateExec(
plan.requiredChildDistributionExpressions,
plan.groupingExpressions,
Expand All @@ -86,8 +98,10 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] {
plan.resultExpressions,
plan.child)
case plan: UnionExec =>
if (!enableColumnarUnion) return false
new ColumnarUnionExec(plan.children)
case plan: ExpandExec =>
if (!enableColumnarExpand) return false
new ColumnarExpandExec(plan.projections, plan.output, plan.child)
case plan: SortExec =>
if (!enableColumnarSort) return false
Expand All @@ -99,6 +113,7 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] {
plan.child,
plan.canChangeNumPartitions)
case plan: ShuffledHashJoinExec =>
if (!enableColumnarShuffledHashJoin) return false
ColumnarShuffledHashJoinExec(
plan.leftKeys,
plan.rightKeys,
Expand All @@ -108,10 +123,12 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] {
plan.left,
plan.right)
case plan: BroadcastExchangeExec =>
if (!enableColumnarBroadcastExchange) return false
ColumnarBroadcastExchangeExec(plan.mode, plan.child)
case plan: BroadcastHashJoinExec =>
// We need to check if BroadcastExchangeExec can be converted to columnar-based.
// If not, BHJ should also be row-based.
if (!enableColumnarBroadcastJoin) return false
val left = plan.left
left match {
case exec: BroadcastExchangeExec =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,49 +18,122 @@
package com.intel.oap

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.internal.SQLConf

case class ColumnarNumaBindingInfo(
enableNumaBinding: Boolean,
totalCoreRange: Array[String] = null,
numCoresPerExecutor: Int = -1) {}

class ColumnarPluginConfig(conf: SQLConf) {
class ColumnarPluginConfig(conf: SQLConf) extends Logging {
def getCpu(): Boolean = {
val source = scala.io.Source.fromFile("/proc/cpuinfo")
val lines = try source.mkString finally source.close()
//TODO(): check CPU flags to enable/disable AVX512
if (lines.contains("GenuineIntel")) {
return true
} else {
//System.out.println(actualSchemaRoot.getRowCount());
logWarning("running on non-intel CPU, disable all columnar operators")
return false
}
}

// for all operators
val enableCpu = getCpu()

// enable or disable columnar batchscan
val enableColumnarBatchScan: Boolean =
conf.getConfString("spark.oap.sql.columnar.batchscan", "true").toBoolean && enableCpu

// enable or disable columnar hashagg
val enableColumnarHashAgg: Boolean =
conf.getConfString("spark.oap.sql.columnar.hashagg", "true").toBoolean && enableCpu

// enable or disable columnar project and filter
val enableColumnarProjFilter: Boolean =
conf.getConfString("spark.oap.sql.columnar.projfilter", "true").toBoolean && enableCpu

// enable or disable columnar sort
val enableColumnarSort: Boolean =
conf.getConfString("spark.sql.columnar.sort", "false").toBoolean
conf.getConfString("spark.oap.sql.columnar.sort", "true").toBoolean && enableCpu

// enable or disable codegen columnar sort
val enableColumnarCodegenSort: Boolean =
conf.getConfString("spark.sql.columnar.codegen.sort", "true").toBoolean
val enableColumnarNaNCheck: Boolean =
conf.getConfString("spark.sql.columnar.nanCheck", "false").toBoolean
val enableColumnarBroadcastJoin: Boolean =
conf.getConfString("spark.sql.columnar.sort.broadcastJoin", "true").toBoolean
conf.getConfString("spark.oap.sql.columnar.codegen.sort", "true").toBoolean && enableColumnarSort

// enable or disable columnar window
val enableColumnarWindow: Boolean =
conf.getConfString("spark.sql.columnar.window", "true").toBoolean
conf.getConfString("spark.oap.sql.columnar.window", "true").toBoolean && enableCpu

// enable or disable columnar shuffledhashjoin
val enableColumnarShuffledHashJoin: Boolean =
conf.getConfString("spark.oap.sql.columnar.shuffledhashjoin", "true").toBoolean && enableCpu

// enable or disable columnar sortmergejoin
// this should be set with preferSortMergeJoin=false
val enableColumnarSortMergeJoin: Boolean =
conf.getConfString("spark.oap.sql.columnar.sortmergejoin", "false").toBoolean
conf.getConfString("spark.oap.sql.columnar.sortmergejoin", "true").toBoolean && enableCpu

// enable or disable columnar union
val enableColumnarUnion: Boolean =
conf.getConfString("spark.oap.sql.columnar.union", "true").toBoolean && enableCpu

// enable or disable columnar expand
val enableColumnarExpand: Boolean =
conf.getConfString("spark.oap.sql.columnar.expand", "true").toBoolean && enableCpu

// enable or disable columnar broadcastexchange
val enableColumnarBroadcastExchange: Boolean =
conf.getConfString("spark.oap.sql.columnar.broadcastexchange", "true").toBoolean && enableCpu

// enable or disable NAN check
val enableColumnarNaNCheck: Boolean =
conf.getConfString("spark.oap.sql.columnar.nanCheck", "true").toBoolean

// enable or disable hashcompare in hashjoins or hashagg
val hashCompare: Boolean =
conf.getConfString("spark.oap.sql.columnar.hashCompare", "true").toBoolean

// enable or disable columnar BroadcastHashJoin
val enableColumnarBroadcastJoin: Boolean =
conf.getConfString("spark.oap.sql.columnar.broadcastJoin", "true").toBoolean && enableCpu

// enable or disable columnar wholestagecodegen
val enableColumnarWholeStageCodegen: Boolean =
conf.getConfString("spark.oap.sql.columnar.wholestagecodegen", "true").toBoolean && enableCpu

// enable or disable columnar exchange
val enableColumnarShuffle: Boolean = conf
.getConfString("spark.shuffle.manager", "sort")
.equals("org.apache.spark.shuffle.sort.ColumnarShuffleManager") && enableCpu

// for all perf turnings
// prefer to use columnar operators if set to true
val enablePreferColumnar: Boolean =
conf.getConfString("spark.oap.sql.columnar.preferColumnar", "false").toBoolean
val enableJoinOptimizationReplace: Boolean =
conf.getConfString("spark.oap.sql.columnar.joinOptimizationReplace", "false").toBoolean

// fallback to row operators if there are several continous joins
val joinOptimizationThrottle: Integer =
conf.getConfString("spark.oap.sql.columnar.joinOptimizationLevel", "6").toInt
val enableColumnarWholeStageCodegen: Boolean =
conf.getConfString("spark.oap.sql.columnar.wholestagecodegen", "true").toBoolean
val enableColumnarShuffle: Boolean = conf
.getConfString("spark.shuffle.manager", "sort")
.equals("org.apache.spark.shuffle.sort.ColumnarShuffleManager")

val batchSize: Int =
conf.getConfString("spark.sql.execution.arrow.maxRecordsPerBatch", "10000").toInt

// enable or disable metrics in columnar wholestagecodegen operator
val enableMetricsTime: Boolean =
conf.getConfString(
"spark.oap.sql.columnar.wholestagecodegen.breakdownTime",
"false").toBoolean

// a folder to store the codegen files
val tmpFile: String =
conf.getConfString("spark.sql.columnar.tmp_dir", null)
conf.getConfString("spark.oap.sql.columnar.tmp_dir", null)

@deprecated val broadcastCacheTimeout: Int =
conf.getConfString("spark.sql.columnar.sort.broadcast.cache.timeout", "-1").toInt
val hashCompare: Boolean =
conf.getConfString("spark.oap.sql.columnar.hashCompare", "false").toBoolean

// Whether to spill the partition buffers when buffers are full.
// If false, the partition buffers will be cached in memory first,
// and the cached buffers will be spilled when reach maximum memory.
Expand All @@ -70,8 +143,11 @@ class ColumnarPluginConfig(conf: SQLConf) {
// The supported customized compression codec is lz4 and fastpfor.
val columnarShuffleUseCustomizedCompressionCodec: String =
conf.getConfString("spark.oap.sql.columnar.shuffle.customizedCompression.codec", "lz4")

// a helper flag to check if it's in unit test
val isTesting: Boolean =
conf.getConfString("spark.oap.sql.columnar.testing", "false").toBoolean

val numaBindingInfo: ColumnarNumaBindingInfo = {
val enableNumaBinding: Boolean =
conf.getConfString("spark.oap.sql.columnar.numaBinding", "false").toBoolean
Expand Down

0 comments on commit d8a4d4d

Please sign in to comment.