From 12e7c9f1389e75a9a184e63c7f9cfaa8820eaa09 Mon Sep 17 00:00:00 2001 From: xorsum Date: Tue, 30 Jul 2024 18:56:01 +0800 Subject: [PATCH 1/2] support Spark 3.2.0 --- .github/workflows/build-ce7-releases.yml | 2 +- .github/workflows/tpcds.yml | 7 ++++ README.md | 2 +- pom.xml | 14 ++++++++ .../blaze/InterceptedValidateSparkPlan.scala | 5 +-- .../apache/spark/sql/blaze/ShimsImpl.scala | 36 +++++++++++++------ .../blaze/plan/ConvertToNativeExec.scala | 4 ++- .../execution/blaze/plan/NativeAggExec.scala | 8 +++-- .../plan/NativeBroadcastExchangeExec.scala | 4 ++- .../blaze/plan/NativeExpandExec.scala | 4 ++- .../blaze/plan/NativeFilterExec.scala | 4 ++- .../blaze/plan/NativeGenerateExec.scala | 4 ++- .../blaze/plan/NativeGlobalLimitExec.scala | 4 ++- .../blaze/plan/NativeLocalLimitExec.scala | 4 ++- ...NativeParquetInsertIntoHiveTableExec.scala | 14 +++++--- .../blaze/plan/NativeParquetSinkExec.scala | 4 ++- .../plan/NativePartialTakeOrderedExec.scala | 4 ++- .../plan/NativeProjectExecProvider.scala | 2 +- .../NativeRenameColumnsExecProvider.scala | 2 +- .../plan/NativeShuffleExchangeExec.scala | 8 +++-- .../execution/blaze/plan/NativeSortExec.scala | 4 ++- .../blaze/plan/NativeTakeOrderedExec.scala | 4 ++- .../blaze/plan/NativeUnionExec.scala | 4 ++- .../blaze/plan/NativeWindowExec.scala | 4 ++- .../BlazeBlockStoreShuffleReader.scala | 4 ++- .../blaze/shuffle/BlazeShuffleManager.scala | 6 ++-- .../blaze/shuffle/BlazeShuffleWriter.scala | 4 ++- .../blaze/plan/NativeBroadcastJoinExec.scala | 24 +++++++++---- .../NativeShuffledHashJoinExecProvider.scala | 4 ++- .../NativeSortMergeJoinExecProvider.scala | 4 ++- 30 files changed, 149 insertions(+), 49 deletions(-) diff --git a/.github/workflows/build-ce7-releases.yml b/.github/workflows/build-ce7-releases.yml index bfa800b8c..81212780d 100644 --- a/.github/workflows/build-ce7-releases.yml +++ b/.github/workflows/build-ce7-releases.yml @@ -11,7 +11,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - sparkver: [spark303, spark324, spark333] + sparkver: [spark303, spark320, spark324, spark333] blazever: [3.0.1] steps: diff --git a/.github/workflows/tpcds.yml b/.github/workflows/tpcds.yml index 1208395ce..3e2775a4b 100644 --- a/.github/workflows/tpcds.yml +++ b/.github/workflows/tpcds.yml @@ -12,6 +12,13 @@ jobs: sparkver: spark303 sparkurl: https://archive.apache.org/dist/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz + test-spark320: + name: Test Spark20 + uses: ./.github/workflows/tpcds-reusable.yml + with: + sparkver: spark320 + sparkurl: https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop2.7.tgz + test-spark324: name: Test Spark324 uses: ./.github/workflows/tpcds-reusable.yml diff --git a/README.md b/README.md index 8138a8baa..bb4baf3aa 100644 --- a/README.md +++ b/README.md @@ -72,7 +72,7 @@ _You could either build Blaze in dev mode for debugging or in release mode to un Blaze._ ```shell -SHIM=spark333 # or spark303/spark324/spark351 +SHIM=spark333 # or spark303/spark320/spark324/spark351 MODE=release # or pre mvn package -P"${SHIM}" -P"${MODE}" ``` diff --git a/pom.xml b/pom.xml index 6b4dc267b..e89c50895 100644 --- a/pom.xml +++ b/pom.xml @@ -274,6 +274,20 @@ + + spark320 + + spark320 + spark3 + 1.8 + 2.12 + 2.12.15 + 3.2.9 + 3.0.0 + 3.2.0 + + + spark324 diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/InterceptedValidateSparkPlan.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/InterceptedValidateSparkPlan.scala index cd8dc872d..b2d922ab8 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/InterceptedValidateSparkPlan.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/InterceptedValidateSparkPlan.scala @@ -70,9 +70,10 @@ object InterceptedValidateSparkPlan extends Logging { } } - @enableIf(Seq("spark303").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark303", "spark320").contains(System.getProperty("blaze.shim"))) def validate(plan: SparkPlan): Unit = { - throw new UnsupportedOperationException("validate is not supported in spark 3.0.3") + throw new UnsupportedOperationException( + "validate is not supported in spark 3.0.3 or spark 3.2.0") } @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala index 38802a01f..4c9429930 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala @@ -116,6 +116,8 @@ class ShimsImpl extends Shims with Logging { @enableIf(Seq("spark303").contains(System.getProperty("blaze.shim"))) def shimVersion: String = "spark303" + @enableIf(Seq("spark320").contains(System.getProperty("blaze.shim"))) + def shimVersion: String = "spark320" @enableIf(Seq("spark324").contains(System.getProperty("blaze.shim"))) def shimVersion: String = "spark324" @enableIf(Seq("spark333").contains(System.getProperty("blaze.shim"))) @@ -134,7 +136,7 @@ class ShimsImpl extends Shims with Logging { } } - @enableIf(Seq("spark303").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark303", "spark320").contains(System.getProperty("blaze.shim"))) override def initExtension(): Unit = {} override def createConvertToNativeExec(child: SparkPlan): ConvertToNativeBase = @@ -354,7 +356,9 @@ class ShimsImpl extends Shims with Logging { length: Long, numRecords: Long): FileSegment = new FileSegment(file, offset, length) - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) override def commit( dep: ShuffleDependency[_, _, _], shuffleBlockResolver: IndexShuffleBlockResolver, @@ -495,7 +499,9 @@ class ShimsImpl extends Shims with Logging { expr.asInstanceOf[AggregateExpression].filter } - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) private def isAQEShuffleRead(exec: SparkPlan): Boolean = { import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec exec.isInstanceOf[AQEShuffleReadExec] @@ -507,7 +513,9 @@ class ShimsImpl extends Shims with Logging { exec.isInstanceOf[CustomShuffleReaderExec] } - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) private def executeNativeAQEShuffleReader(exec: SparkPlan): NativeRDD = { import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec import org.apache.spark.sql.execution.CoalescedMapperPartitionSpec @@ -686,7 +694,9 @@ class ShimsImpl extends Shims with Logging { } } - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) override def getSqlContext(sparkPlan: SparkPlan): SQLContext = sparkPlan.session.sqlContext @@ -700,7 +710,9 @@ class ShimsImpl extends Shims with Logging { NativeExprWrapper(nativeExpr, dataType, nullable) } - @enableIf(Seq("spark303", "spark324", "spark333").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark303", "spark320", "spark324", "spark333").contains( + System.getProperty("blaze.shim"))) private def convertPromotePrecision( e: Expression, isPruningExpr: Boolean, @@ -747,7 +759,7 @@ class ShimsImpl extends Shims with Logging { } } - @enableIf(Seq("spark303", "spark324").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark303", "spark320", "spark324").contains(System.getProperty("blaze.shim"))) private def convertBloomFilterAgg(agg: AggregateFunction): Option[pb.PhysicalAggExprNode] = None @enableIf(Seq("spark333", "spark351").contains(System.getProperty("blaze.shim"))) @@ -771,7 +783,7 @@ class ShimsImpl extends Shims with Logging { } } - @enableIf(Seq("spark303", "spark324").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark303", "spark320", "spark324").contains(System.getProperty("blaze.shim"))) private def convertBloomFilterMightContain( e: Expression, isPruningExpr: Boolean, @@ -782,7 +794,9 @@ class ShimsImpl extends Shims with Logging { case class ForceNativeExecutionWrapper(override val child: SparkPlan) extends ForceNativeExecutionWrapperBase(child) { - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) override def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) @@ -797,6 +811,8 @@ case class NativeExprWrapper( override val nullable: Boolean) extends NativeExprWrapperBase(nativeExpr, dataType, nullable) { - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) override def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression = copy() } diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/ConvertToNativeExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/ConvertToNativeExec.scala index 2497b07d2..fdc0de19d 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/ConvertToNativeExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/ConvertToNativeExec.scala @@ -21,7 +21,9 @@ import com.thoughtworks.enableIf case class ConvertToNativeExec(override val child: SparkPlan) extends ConvertToNativeBase(child) { - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggExec.scala index afb21cba6..2f206bff8 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggExec.scala @@ -47,7 +47,9 @@ case class NativeAggExec( child) with BaseAggregateExec { - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) override val requiredChildDistributionExpressions: Option[Seq[Expression]] = theRequiredChildDistributionExpressions @@ -71,7 +73,9 @@ case class NativeAggExec( override def resultExpressions: Seq[NamedExpression] = output - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeBroadcastExchangeExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeBroadcastExchangeExec.scala index 4ae23a49f..965238b60 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeBroadcastExchangeExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeBroadcastExchangeExec.scala @@ -42,7 +42,9 @@ case class NativeBroadcastExchangeExec(mode: BroadcastMode, override val child: relationFuturePromise.future } - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeExpandExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeExpandExec.scala index 6a9df770f..cc2119c41 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeExpandExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeExpandExec.scala @@ -27,7 +27,9 @@ case class NativeExpandExec( override val child: SparkPlan) extends NativeExpandBase(projections, output, child) { - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeFilterExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeFilterExec.scala index cfa2ce484..e945fd0e0 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeFilterExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeFilterExec.scala @@ -23,7 +23,9 @@ import com.thoughtworks.enableIf case class NativeFilterExec(condition: Expression, override val child: SparkPlan) extends NativeFilterBase(condition, child) { - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGenerateExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGenerateExec.scala index a4ac423e3..6570b009e 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGenerateExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGenerateExec.scala @@ -29,7 +29,9 @@ case class NativeGenerateExec( override val child: SparkPlan) extends NativeGenerateBase(generator, requiredChildOutput, outer, generatorOutput, child) { - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGlobalLimitExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGlobalLimitExec.scala index 678650eaa..20c8c4e30 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGlobalLimitExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGlobalLimitExec.scala @@ -22,7 +22,9 @@ import com.thoughtworks.enableIf case class NativeGlobalLimitExec(limit: Long, override val child: SparkPlan) extends NativeGlobalLimitBase(limit, child) { - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeLocalLimitExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeLocalLimitExec.scala index 2d8f40479..63bf6d5bc 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeLocalLimitExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeLocalLimitExec.scala @@ -22,7 +22,9 @@ import com.thoughtworks.enableIf case class NativeLocalLimitExec(limit: Long, override val child: SparkPlan) extends NativeLocalLimitBase(limit, child) { - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableExec.scala index d591b7585..4e0703f6c 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableExec.scala @@ -31,7 +31,9 @@ case class NativeParquetInsertIntoHiveTableExec( override val child: SparkPlan) extends NativeParquetInsertIntoHiveTableBase(cmd, child) { - @enableIf(Seq("spark303", "spark324", "spark333").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark303", "spark320", "spark324", "spark333").contains( + System.getProperty("blaze.shim"))) override protected def getInsertIntoHiveTableCommand( table: CatalogTable, partition: Map[String, Option[String]], @@ -69,7 +71,9 @@ case class NativeParquetInsertIntoHiveTableExec( metrics) } - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) @@ -77,7 +81,9 @@ case class NativeParquetInsertIntoHiveTableExec( override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan = copy(child = newChildren.head) - @enableIf(Seq("spark303", "spark324", "spark333").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark303", "spark320", "spark324", "spark333").contains( + System.getProperty("blaze.shim"))) class BlazeInsertIntoHiveTable303( table: CatalogTable, partition: Map[String, Option[String]], @@ -102,7 +108,7 @@ case class NativeParquetInsertIntoHiveTableExec( super.run(sparkSession, nativeParquetSink) } - @enableIf(Seq("spark324", "spark333").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark320", "spark324", "spark333").contains(System.getProperty("blaze.shim"))) override def basicWriteJobStatsTracker(hadoopConf: org.apache.hadoop.conf.Configuration) = { import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetSinkExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetSinkExec.scala index a2da4e4b5..3b8b7d266 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetSinkExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetSinkExec.scala @@ -30,7 +30,9 @@ case class NativeParquetSinkExec( override val metrics: Map[String, SQLMetric]) extends NativeParquetSinkBase(sparkSession, table, partition, child, metrics) { - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativePartialTakeOrderedExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativePartialTakeOrderedExec.scala index 9ac051bb4..75f4f0e55 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativePartialTakeOrderedExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativePartialTakeOrderedExec.scala @@ -28,7 +28,9 @@ case class NativePartialTakeOrderedExec( override val metrics: Map[String, SQLMetric]) extends NativePartialTakeOrderedBase(limit, sortOrder, child, metrics) { - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeProjectExecProvider.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeProjectExecProvider.scala index 48874ce92..9c179ba14 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeProjectExecProvider.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeProjectExecProvider.scala @@ -49,7 +49,7 @@ case object NativeProjectExecProvider { NativeProjectExec(projectList, child, addTypeCast) } - @enableIf(Seq("spark324", "spark333").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark320", "spark324", "spark333").contains(System.getProperty("blaze.shim"))) def provide( projectList: Seq[NamedExpression], child: SparkPlan, diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeRenameColumnsExecProvider.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeRenameColumnsExecProvider.scala index dc3ddd2ca..3a6906bee 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeRenameColumnsExecProvider.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeRenameColumnsExecProvider.scala @@ -46,7 +46,7 @@ case object NativeRenameColumnsExecProvider { NativeRenameColumnsExec(child, renamedColumnNames) } - @enableIf(Seq("spark324", "spark333").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark320", "spark324", "spark333").contains(System.getProperty("blaze.shim"))) def provide(child: SparkPlan, renamedColumnNames: Seq[String]): NativeRenameColumnsBase = { import org.apache.spark.sql.catalyst.expressions.NamedExpression import org.apache.spark.sql.catalyst.expressions.SortOrder diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeShuffleExchangeExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeShuffleExchangeExec.scala index 9e0f5904e..a7d7eef53 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeShuffleExchangeExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeShuffleExchangeExec.scala @@ -154,11 +154,15 @@ case class NativeShuffleExchangeExec( override def canChangeNumPartitions: Boolean = outputPartitioning != SinglePartition - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) override def shuffleOrigin = org.apache.spark.sql.execution.exchange.ENSURE_REQUIREMENTS - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeSortExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeSortExec.scala index 9abdd2efe..a1723c9dd 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeSortExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeSortExec.scala @@ -26,7 +26,9 @@ case class NativeSortExec( override val child: SparkPlan) extends NativeSortBase(sortOrder, global, child) { - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeTakeOrderedExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeTakeOrderedExec.scala index 5210144dc..0101096a9 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeTakeOrderedExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeTakeOrderedExec.scala @@ -26,7 +26,9 @@ case class NativeTakeOrderedExec( override val child: SparkPlan) extends NativeTakeOrderedBase(limit, sortOrder, child) { - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeUnionExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeUnionExec.scala index ce76b6d88..2fc4f102e 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeUnionExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeUnionExec.scala @@ -22,7 +22,9 @@ import com.thoughtworks.enableIf case class NativeUnionExec(override val children: Seq[SparkPlan]) extends NativeUnionBase(children) { - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan = copy(children = newChildren) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeWindowExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeWindowExec.scala index c885e72d5..b5f3abe41 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeWindowExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeWindowExec.scala @@ -29,7 +29,9 @@ case class NativeWindowExec( override val child: SparkPlan) extends NativeWindowBase(windowExpression, partitionSpec, orderSpec, child) { - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeBlockStoreShuffleReader.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeBlockStoreShuffleReader.scala index e9d4d65e8..1f74728de 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeBlockStoreShuffleReader.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeBlockStoreShuffleReader.scala @@ -44,7 +44,9 @@ class BlazeBlockStoreShuffleReader[K, C]( with Logging { override def readBlocks(): Iterator[(BlockId, InputStream)] = { - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) def fetchIterator = new ShuffleBlockFetcherIterator( context, blockManager.blockStoreClient, diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleManager.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleManager.scala index 3e6b696c7..3c09293d8 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleManager.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleManager.scala @@ -47,7 +47,9 @@ class BlazeShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { sortShuffleManager.registerShuffle(shuffleId, dependency) } - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) override def getReader[K, C]( handle: ShuffleHandle, startMapIndex: Int, @@ -60,7 +62,7 @@ class BlazeShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { if (isArrowShuffle(handle)) { val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, _, C]] - @enableIf(Seq("spark324").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark320", "spark324").contains(System.getProperty("blaze.shim"))) def shuffleMergeFinalized = baseShuffleHandle.dependency.shuffleMergeFinalized @enableIf(Seq("spark333", "spark351").contains(System.getProperty("blaze.shim"))) def shuffleMergeFinalized = baseShuffleHandle.dependency.isShuffleMergeFinalizedMarked diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleWriter.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleWriter.scala index 010de3de8..04043b70d 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleWriter.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleWriter.scala @@ -22,6 +22,8 @@ import com.thoughtworks.enableIf class BlazeShuffleWriter[K, V](metrics: ShuffleWriteMetricsReporter) extends BlazeShuffleWriterBase[K, V](metrics) { - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) override def getPartitionLengths(): Array[Long] = partitionLengths } diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeBroadcastJoinExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeBroadcastJoinExec.scala index c8b10a013..f19f3b881 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeBroadcastJoinExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeBroadcastJoinExec.scala @@ -47,7 +47,9 @@ case class NativeBroadcastJoinExec( override val condition: Option[Expression] = None - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) override def buildSide: org.apache.spark.sql.catalyst.optimizer.BuildSide = broadcastSide match { case BroadcastLeft => org.apache.spark.sql.catalyst.optimizer.BuildLeft @@ -60,7 +62,9 @@ case class NativeBroadcastJoinExec( case BroadcastRight => org.apache.spark.sql.execution.joins.BuildRight } - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) override def requiredChildDistribution = { import org.apache.spark.sql.catalyst.plans.physical.BroadcastDistribution import org.apache.spark.sql.catalyst.plans.physical.UnspecifiedDistribution @@ -70,22 +74,30 @@ case class NativeBroadcastJoinExec( BroadcastDistribution(mode) :: UnspecifiedDistribution :: Nil } - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) override def supportCodegen: Boolean = false - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) override def inputRDDs() = { throw new NotImplementedError("NativeBroadcastJoin dose not support codegen") } - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) override protected def prepareRelation( ctx: org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext) : org.apache.spark.sql.execution.joins.HashedRelationInfo = { throw new NotImplementedError("NativeBroadcastJoin dose not support codegen") } - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildrenInternal( newLeft: SparkPlan, newRight: SparkPlan): SparkPlan = diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeShuffledHashJoinExecProvider.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeShuffledHashJoinExecProvider.scala index 3048b97bb..26f9d6f73 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeShuffledHashJoinExecProvider.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeShuffledHashJoinExecProvider.scala @@ -25,7 +25,9 @@ import com.thoughtworks.enableIf case object NativeShuffledHashJoinExecProvider { - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) def provide( left: SparkPlan, right: SparkPlan, diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeSortMergeJoinExecProvider.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeSortMergeJoinExecProvider.scala index 1f0cc919e..f96d972cd 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeSortMergeJoinExecProvider.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeSortMergeJoinExecProvider.scala @@ -24,7 +24,9 @@ import com.thoughtworks.enableIf case object NativeSortMergeJoinExecProvider { - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark320", "spark324", "spark333", "spark351").contains( + System.getProperty("blaze.shim"))) def provide( left: SparkPlan, right: SparkPlan, From bbcf64c6b325a7352a1f290937d48cf301adaa98 Mon Sep 17 00:00:00 2001 From: xorsum Date: Tue, 30 Jul 2024 19:25:28 +0800 Subject: [PATCH 2/2] fix typo --- .github/workflows/tpcds.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tpcds.yml b/.github/workflows/tpcds.yml index 3e2775a4b..2e6970653 100644 --- a/.github/workflows/tpcds.yml +++ b/.github/workflows/tpcds.yml @@ -13,7 +13,7 @@ jobs: sparkurl: https://archive.apache.org/dist/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz test-spark320: - name: Test Spark20 + name: Test Spark320 uses: ./.github/workflows/tpcds-reusable.yml with: sparkver: spark320