Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Spark 3.2.0 #538

Merged
merged 2 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build-ce7-releases.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
sparkver: [spark303, spark324, spark333]
sparkver: [spark303, spark320, spark324, spark333]
blazever: [3.0.1]

steps:
Expand Down
7 changes: 7 additions & 0 deletions .github/workflows/tpcds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ jobs:
sparkver: spark303
sparkurl: https://archive.apache.org/dist/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz

test-spark320:
name: Test Spark320
uses: ./.github/workflows/tpcds-reusable.yml
with:
sparkver: spark320
sparkurl: https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop2.7.tgz

test-spark324:
name: Test Spark324
uses: ./.github/workflows/tpcds-reusable.yml
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ _You could either build Blaze in dev mode for debugging or in release mode to un
Blaze._

```shell
SHIM=spark333 # or spark303/spark324/spark351
SHIM=spark333 # or spark303/spark320/spark324/spark351
MODE=release # or pre
mvn package -P"${SHIM}" -P"${MODE}"
```
Expand Down
14 changes: 14 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,20 @@
</properties>
</profile>

<profile>
<id>spark320</id>
<properties>
<shimName>spark320</shimName>
<shimPkg>spark3</shimPkg>
<javaVersion>1.8</javaVersion>
<scalaVersion>2.12</scalaVersion>
<scalaLongVersion>2.12.15</scalaLongVersion>
<scalaTestVersion>3.2.9</scalaTestVersion>
<scalafmtVersion>3.0.0</scalafmtVersion>
<sparkVersion>3.2.0</sparkVersion>
</properties>
</profile>

<profile>
<id>spark324</id>
<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ object InterceptedValidateSparkPlan extends Logging {
}
}

@enableIf(Seq("spark303").contains(System.getProperty("blaze.shim")))
@enableIf(Seq("spark303", "spark320").contains(System.getProperty("blaze.shim")))
def validate(plan: SparkPlan): Unit = {
throw new UnsupportedOperationException("validate is not supported in spark 3.0.3")
throw new UnsupportedOperationException(
"validate is not supported in spark 3.0.3 or spark 3.2.0")
}

@enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ class ShimsImpl extends Shims with Logging {

@enableIf(Seq("spark303").contains(System.getProperty("blaze.shim")))
def shimVersion: String = "spark303"
@enableIf(Seq("spark320").contains(System.getProperty("blaze.shim")))
def shimVersion: String = "spark320"
@enableIf(Seq("spark324").contains(System.getProperty("blaze.shim")))
def shimVersion: String = "spark324"
@enableIf(Seq("spark333").contains(System.getProperty("blaze.shim")))
Expand All @@ -134,7 +136,7 @@ class ShimsImpl extends Shims with Logging {
}
}

@enableIf(Seq("spark303").contains(System.getProperty("blaze.shim")))
@enableIf(Seq("spark303", "spark320").contains(System.getProperty("blaze.shim")))
override def initExtension(): Unit = {}

override def createConvertToNativeExec(child: SparkPlan): ConvertToNativeBase =
Expand Down Expand Up @@ -354,7 +356,9 @@ class ShimsImpl extends Shims with Logging {
length: Long,
numRecords: Long): FileSegment = new FileSegment(file, offset, length)

@enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark320", "spark324", "spark333", "spark351").contains(
System.getProperty("blaze.shim")))
override def commit(
dep: ShuffleDependency[_, _, _],
shuffleBlockResolver: IndexShuffleBlockResolver,
Expand Down Expand Up @@ -495,7 +499,9 @@ class ShimsImpl extends Shims with Logging {
expr.asInstanceOf[AggregateExpression].filter
}

@enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark320", "spark324", "spark333", "spark351").contains(
System.getProperty("blaze.shim")))
private def isAQEShuffleRead(exec: SparkPlan): Boolean = {
import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
exec.isInstanceOf[AQEShuffleReadExec]
Expand All @@ -507,7 +513,9 @@ class ShimsImpl extends Shims with Logging {
exec.isInstanceOf[CustomShuffleReaderExec]
}

@enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark320", "spark324", "spark333", "spark351").contains(
System.getProperty("blaze.shim")))
private def executeNativeAQEShuffleReader(exec: SparkPlan): NativeRDD = {
import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
import org.apache.spark.sql.execution.CoalescedMapperPartitionSpec
Expand Down Expand Up @@ -686,7 +694,9 @@ class ShimsImpl extends Shims with Logging {
}
}

@enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark320", "spark324", "spark333", "spark351").contains(
System.getProperty("blaze.shim")))
override def getSqlContext(sparkPlan: SparkPlan): SQLContext =
sparkPlan.session.sqlContext

Expand All @@ -700,7 +710,9 @@ class ShimsImpl extends Shims with Logging {
NativeExprWrapper(nativeExpr, dataType, nullable)
}

@enableIf(Seq("spark303", "spark324", "spark333").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark303", "spark320", "spark324", "spark333").contains(
System.getProperty("blaze.shim")))
private def convertPromotePrecision(
e: Expression,
isPruningExpr: Boolean,
Expand Down Expand Up @@ -747,7 +759,7 @@ class ShimsImpl extends Shims with Logging {
}
}

@enableIf(Seq("spark303", "spark324").contains(System.getProperty("blaze.shim")))
@enableIf(Seq("spark303", "spark320", "spark324").contains(System.getProperty("blaze.shim")))
private def convertBloomFilterAgg(agg: AggregateFunction): Option[pb.PhysicalAggExprNode] = None

@enableIf(Seq("spark333", "spark351").contains(System.getProperty("blaze.shim")))
Expand All @@ -771,7 +783,7 @@ class ShimsImpl extends Shims with Logging {
}
}

@enableIf(Seq("spark303", "spark324").contains(System.getProperty("blaze.shim")))
@enableIf(Seq("spark303", "spark320", "spark324").contains(System.getProperty("blaze.shim")))
private def convertBloomFilterMightContain(
e: Expression,
isPruningExpr: Boolean,
Expand All @@ -782,7 +794,9 @@ class ShimsImpl extends Shims with Logging {
case class ForceNativeExecutionWrapper(override val child: SparkPlan)
extends ForceNativeExecutionWrapperBase(child) {

@enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark320", "spark324", "spark333", "spark351").contains(
System.getProperty("blaze.shim")))
override def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)

Expand All @@ -797,6 +811,8 @@ case class NativeExprWrapper(
override val nullable: Boolean)
extends NativeExprWrapperBase(nativeExpr, dataType, nullable) {

@enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark320", "spark324", "spark333", "spark351").contains(
System.getProperty("blaze.shim")))
override def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression = copy()
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import com.thoughtworks.enableIf

case class ConvertToNativeExec(override val child: SparkPlan) extends ConvertToNativeBase(child) {

@enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark320", "spark324", "spark333", "spark351").contains(
System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ case class NativeAggExec(
child)
with BaseAggregateExec {

@enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark320", "spark324", "spark333", "spark351").contains(
System.getProperty("blaze.shim")))
override val requiredChildDistributionExpressions: Option[Seq[Expression]] =
theRequiredChildDistributionExpressions

Expand All @@ -71,7 +73,9 @@ case class NativeAggExec(

override def resultExpressions: Seq[NamedExpression] = output

@enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark320", "spark324", "spark333", "spark351").contains(
System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ case class NativeBroadcastExchangeExec(mode: BroadcastMode, override val child:
relationFuturePromise.future
}

@enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark320", "spark324", "spark333", "spark351").contains(
System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ case class NativeExpandExec(
override val child: SparkPlan)
extends NativeExpandBase(projections, output, child) {

@enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark320", "spark324", "spark333", "spark351").contains(
System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import com.thoughtworks.enableIf
case class NativeFilterExec(condition: Expression, override val child: SparkPlan)
extends NativeFilterBase(condition, child) {

@enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark320", "spark324", "spark333", "spark351").contains(
System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ case class NativeGenerateExec(
override val child: SparkPlan)
extends NativeGenerateBase(generator, requiredChildOutput, outer, generatorOutput, child) {

@enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark320", "spark324", "spark333", "spark351").contains(
System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import com.thoughtworks.enableIf
case class NativeGlobalLimitExec(limit: Long, override val child: SparkPlan)
extends NativeGlobalLimitBase(limit, child) {

@enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark320", "spark324", "spark333", "spark351").contains(
System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import com.thoughtworks.enableIf
case class NativeLocalLimitExec(limit: Long, override val child: SparkPlan)
extends NativeLocalLimitBase(limit, child) {

@enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark320", "spark324", "spark333", "spark351").contains(
System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ case class NativeParquetInsertIntoHiveTableExec(
override val child: SparkPlan)
extends NativeParquetInsertIntoHiveTableBase(cmd, child) {

@enableIf(Seq("spark303", "spark324", "spark333").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark303", "spark320", "spark324", "spark333").contains(
System.getProperty("blaze.shim")))
override protected def getInsertIntoHiveTableCommand(
table: CatalogTable,
partition: Map[String, Option[String]],
Expand Down Expand Up @@ -69,15 +71,19 @@ case class NativeParquetInsertIntoHiveTableExec(
metrics)
}

@enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark320", "spark324", "spark333", "spark351").contains(
System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)

@enableIf(Seq("spark303").contains(System.getProperty("blaze.shim")))
override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan =
copy(child = newChildren.head)

@enableIf(Seq("spark303", "spark324", "spark333").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark303", "spark320", "spark324", "spark333").contains(
System.getProperty("blaze.shim")))
class BlazeInsertIntoHiveTable303(
table: CatalogTable,
partition: Map[String, Option[String]],
Expand All @@ -102,7 +108,7 @@ case class NativeParquetInsertIntoHiveTableExec(
super.run(sparkSession, nativeParquetSink)
}

@enableIf(Seq("spark324", "spark333").contains(System.getProperty("blaze.shim")))
@enableIf(Seq("spark320", "spark324", "spark333").contains(System.getProperty("blaze.shim")))
override def basicWriteJobStatsTracker(hadoopConf: org.apache.hadoop.conf.Configuration) = {
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ case class NativeParquetSinkExec(
override val metrics: Map[String, SQLMetric])
extends NativeParquetSinkBase(sparkSession, table, partition, child, metrics) {

@enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark320", "spark324", "spark333", "spark351").contains(
System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ case class NativePartialTakeOrderedExec(
override val metrics: Map[String, SQLMetric])
extends NativePartialTakeOrderedBase(limit, sortOrder, child, metrics) {

@enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark320", "spark324", "spark333", "spark351").contains(
System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ case object NativeProjectExecProvider {
NativeProjectExec(projectList, child, addTypeCast)
}

@enableIf(Seq("spark324", "spark333").contains(System.getProperty("blaze.shim")))
@enableIf(Seq("spark320", "spark324", "spark333").contains(System.getProperty("blaze.shim")))
def provide(
projectList: Seq[NamedExpression],
child: SparkPlan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ case object NativeRenameColumnsExecProvider {
NativeRenameColumnsExec(child, renamedColumnNames)
}

@enableIf(Seq("spark324", "spark333").contains(System.getProperty("blaze.shim")))
@enableIf(Seq("spark320", "spark324", "spark333").contains(System.getProperty("blaze.shim")))
def provide(child: SparkPlan, renamedColumnNames: Seq[String]): NativeRenameColumnsBase = {
import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.catalyst.expressions.SortOrder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,15 @@ case class NativeShuffleExchangeExec(
override def canChangeNumPartitions: Boolean =
outputPartitioning != SinglePartition

@enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark320", "spark324", "spark333", "spark351").contains(
System.getProperty("blaze.shim")))
override def shuffleOrigin =
org.apache.spark.sql.execution.exchange.ENSURE_REQUIREMENTS

@enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark320", "spark324", "spark333", "spark351").contains(
System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ case class NativeSortExec(
override val child: SparkPlan)
extends NativeSortBase(sortOrder, global, child) {

@enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark320", "spark324", "spark333", "spark351").contains(
System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ case class NativeTakeOrderedExec(
override val child: SparkPlan)
extends NativeTakeOrderedBase(limit, sortOrder, child) {

@enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark320", "spark324", "spark333", "spark351").contains(
System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)

Expand Down
Loading