Skip to content

Commit

Permalink
DATAFU-179 Support Spark 3.3.x and 3.4.x (#47)
Browse files Browse the repository at this point in the history
  • Loading branch information
eyala authored Jan 6, 2025
1 parent 516873e commit 601ab12
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 9 deletions.
4 changes: 2 additions & 2 deletions datafu-spark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ This matrix represents versions of Spark that DataFu has been compiled and teste
| 1.7.0 | 2.2.0 to 2.2.2, 2.3.0 to 2.3.2 and 2.4.0 to 2.4.3|
| 1.8.0 | 2.2.3, 2.3.3, and 2.4.4 to 2.4.5|
| 2.0.0 | 3.0.x - 3.1.x |
| 2.1.0 (not released yet) | 3.0.x - 3.2.x |
| 2.1.0 (not released yet) | 3.0.x - 3.4.x |

# Examples

Expand All @@ -25,7 +25,7 @@ Here are some examples of things you can do with it:

* [Count distinct up to](https://github.com/apache/datafu/blob/main/datafu-spark/src/main/scala/datafu/spark/Aggregators.scala#L187) - an efficient implementation when you just want to verify that a certain minimum of distinct rows appear in a table

It has been tested on Spark releases from 3.0.0 to 3.1.3 using Scala 2.12. You can check if your Spark/Scala version combination has been tested by looking [here.](https://github.com/apache/datafu/blob/main/datafu-spark/build_and_test_spark.sh#L20)
It has been tested on Spark releases from 3.0.0 to 3.4.2 using Scala 2.12. You can check if your Spark/Scala version combination has been tested by looking [here.](https://github.com/apache/datafu/blob/main/datafu-spark/build_and_test_spark.sh#L20)

-----------

Expand Down
2 changes: 1 addition & 1 deletion datafu-spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ dependencies {
testCompile "com.holdenkarau:spark-testing-base_" + scalaVersion + ":3.1.2_" + sparkTestingBaseVersion
} else if (sparkVersion > "3.2" && sparkVersion < "3.3") {
testCompile "com.holdenkarau:spark-testing-base_" + scalaVersion + ":3.2.1_" + sparkTestingBaseVersion
} else if (sparkVersion > "3.3" && sparkVersion < "3.4") {
} else if (sparkVersion >= "3.3") {
testCompile "com.holdenkarau:spark-testing-base_" + scalaVersion + ":3.3.0_" + sparkTestingBaseVersion
} else {
testCompile "com.holdenkarau:spark-testing-base_" + scalaVersion + ":" + sparkVersion + "_" + sparkTestingBaseVersion
Expand Down
5 changes: 3 additions & 2 deletions datafu-spark/build_and_test_spark.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

#!/bin/bash

export SPARK_VERSIONS_FOR_SCALA_212="3.0.1 3.0.2 3.0.3 3.1.1 3.1.2 3.1.3 3.2.0 3.2.1 3.2.2 3.2.3 3.2.4"
export LATEST_SPARK_VERSIONS_FOR_SCALA_212="3.0.3 3.1.3 3.2.4"
# we skip 3.0.0 because it has a bug which fails our Aggregator tests
export SPARK_VERSIONS_FOR_SCALA_212="3.0.1 3.0.2 3.0.3 3.1.1 3.1.2 3.1.3 3.2.0 3.2.1 3.2.2 3.2.3 3.2.4 3.3.0 3.3.1 3.3.2 3.3.3 3.3.4 3.4.0 3.4.1 3.4.2"
export LATEST_SPARK_VERSIONS_FOR_SCALA_212="3.0.3 3.1.3 3.2.4 3.3.4 3.4.2"


STARTTIME=$(date +%s)
Expand Down
6 changes: 3 additions & 3 deletions datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from py4j.java_gateway import JavaGateway, GatewayParameters
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession

# use jvm gateway to create a java class instance by full-qualified class name
Expand Down Expand Up @@ -63,10 +64,9 @@ def __init__(self):
conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
self.sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)

# Spark 2
# Spark 3
self.sparkSession = SparkSession(self.sc, jSparkSession)
self.sqlContext = self.sparkSession._wrapped

self.sqlContext = SQLContext(sparkContext=self.sc, sparkSession=self.sparkSession)
ctx = None


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ object SparkDFUtils {
* the same functionality as {@link #dedupWithOrder} but implemented using UDAF to utilize
* map side aggregation.
* this function should be used in cases when you expect a large number of rows to get combined,
* as they share the same group column.
* as they share the same group column, or if you have some groups with extreme skew.
*
* @param df DataFrame to operate on
* @param groupCol column to group by the records
Expand Down

0 comments on commit 601ab12

Please sign in to comment.