From 633a548c6b18e105f19af093a06b4d40c3c17f01 Mon Sep 17 00:00:00 2001 From: philo Date: Fri, 19 Aug 2022 18:29:42 +0800 Subject: [PATCH] Make required config accessible --- .../datasources/v2/arrow/ArrowScan.scala | 22 ++++++++++++++- .../com/intel/oap/sql/shims/SparkShims.scala | 6 ++-- .../sql/shims/spark311/Spark311Shims.scala | 4 +++ .../sql/shims/spark321/Spark321Shims.scala | 4 +++ .../org/apache/spark/sql/util/ShimUtils.scala | 28 +++++++++++++++++++ 5 files changed, 61 insertions(+), 3 deletions(-) create mode 100644 shims/spark321/src/main/scala/org/apache/spark/sql/util/ShimUtils.scala diff --git a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala index 01459d7a5..f5d8fc0a5 100644 --- a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala +++ b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowScan.scala @@ -16,6 +16,8 @@ */ package com.intel.oap.spark.sql.execution.datasources.v2.arrow +import com.intel.oap.sql.shims.SparkShimLoader + import java.util.Locale import scala.collection.JavaConverters._ @@ -25,9 +27,10 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.connector.read.PartitionReaderFactory import org.apache.spark.sql.execution.PartitionedFileUtil -import org.apache.spark.sql.execution.datasources.{FilePartition, PartitioningAwareFileIndex} +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionDirectory, PartitioningAwareFileIndex} import org.apache.spark.sql.execution.datasources.v2.FileScan import org.apache.spark.sql.execution.datasources.v2.arrow.ScanUtils +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -71,9 +74,26 @@ case class ArrowScan( this.copy(partitionFilters = partitionFilters, dataFilters = dataFilters) // compute maxSplitBytes + def maxSplitBytes( + sparkSession: SparkSession, + selectedPartitions: Seq[PartitionDirectory]): Long = { + val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes + val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes + // val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum + // .getOrElse(sparkSession.leafNodeDefaultParallelism) + val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum + .getOrElse(SparkShimLoader.getSparkShims.leafNodeDefaultParallelism(sparkSession)) + val PREFERRED_PARTITION_SIZE_LOWER_BOUND: Long = 128 * 1024 * 1024 + val PREFERRED_PARTITION_SIZE_UPPER_BOUND: Long = 512 * 1024 * 1024 + val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum + val bytesPerCore = totalBytes / minPartitionNum + + Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + } override def partitions: Seq[FilePartition] = { val selectedPartitions = fileIndex.listFiles(partitionFilters, dataFilters) + // val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions) val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions) // val partitionAttributes = fileIndex.partitionSchema.toAttributes val partitionAttributes = ScanUtils.toAttributes(fileIndex) diff --git a/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala b/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala index 61bd49b57..9a4ccfc2b 100644 --- a/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala +++ b/shims/common/src/main/scala/com/intel/oap/sql/shims/SparkShims.scala @@ -28,8 +28,7 @@ import org.apache.spark.shuffle.MigratableResolver import org.apache.spark.shuffle.ShuffleHandle import org.apache.spark.shuffle.api.ShuffleExecutorComponents import org.apache.spark.shuffle.sort.SortShuffleWriter -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{SQLContext, SparkSession} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan} @@ -121,4 +120,7 @@ trait SparkShims { def getEndMapIndexOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int def getNumReducersOfCoalescedMapperPartitionSpec(spec: ShufflePartitionSpec): Int + + def leafNodeDefaultParallelism(sparkSession: SparkSession): Int + } diff --git a/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala b/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala index 46912bb70..43f7d99d5 100644 --- a/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala +++ b/shims/spark311/src/main/scala/com/intel/oap/sql/shims/spark311/Spark311Shims.scala @@ -205,4 +205,8 @@ class Spark311Shims extends SparkShims { throw new RuntimeException("This method should not be invoked in spark 3.1.") } + override def leafNodeDefaultParallelism(sparkSession: SparkSession): Int = { + sparkSession.sparkContext.defaultParallelism + } + } \ No newline at end of file diff --git a/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala b/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala index 34f5bd03c..4c9ca9273 100644 --- a/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala +++ b/shims/spark321/src/main/scala/com/intel/oap/sql/shims/spark321/Spark321Shims.scala @@ -235,4 +235,8 @@ class Spark321Shims extends SparkShims { } } + override def leafNodeDefaultParallelism(sparkSession: SparkSession): Int = { + org.apache.spark.sql.util.ShimUtils.leafNodeDefaultParallelism(sparkSession) + } + } \ No newline at end of file diff --git a/shims/spark321/src/main/scala/org/apache/spark/sql/util/ShimUtils.scala b/shims/spark321/src/main/scala/org/apache/spark/sql/util/ShimUtils.scala new file mode 100644 index 000000000..26b2ea965 --- /dev/null +++ b/shims/spark321/src/main/scala/org/apache/spark/sql/util/ShimUtils.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2020 Intel Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.util + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf + +object ShimUtils { + + def leafNodeDefaultParallelism(sparkSession: SparkSession): Int = { + sparkSession.conf.get(SQLConf.LEAF_NODE_DEFAULT_PARALLELISM).getOrElse( + sparkSession.sparkContext.defaultParallelism) + } +} \ No newline at end of file