Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-44259][CONNECT][TESTS] Make connect-client-jvm pass on Java 21 except RemoteSparkSession-based tests #41805

Closed
wants to merge 4 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import java.util.concurrent.TimeUnit

import scala.io.Source

import org.scalatest.BeforeAndAfterAll
import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.scalactic.source.Position
import org.scalatest.{BeforeAndAfterAll, Tag}
import sys.process._

import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -170,41 +172,44 @@ trait RemoteSparkSession extends ConnectFunSuite with BeforeAndAfterAll {
protected lazy val serverPort: Int = port

override def beforeAll(): Unit = {
super.beforeAll()
SparkConnectServerUtils.start()
spark = SparkSession
.builder()
.client(SparkConnectClient.builder().port(serverPort).build())
.create()

// Retry and wait for the server to start
val stop = System.nanoTime() + TimeUnit.MINUTES.toNanos(1) // ~1 min
var sleepInternalMs = TimeUnit.SECONDS.toMillis(1) // 1s with * 2 backoff
var success = false
val error = new RuntimeException(s"Failed to start the test server on port $serverPort.")

while (!success && System.nanoTime() < stop) {
try {
// Run a simple query to verify the server is really up and ready
val result = spark
.sql("select val from (values ('Hello'), ('World')) as t(val)")
.collect()
assert(result.length == 2)
success = true
debug("Spark Connect Server is up.")
} catch {
// ignored the error
case e: Throwable =>
error.addSuppressed(e)
Thread.sleep(sleepInternalMs)
sleepInternalMs *= 2
// TODO(SPARK-44121) Remove this check condition
if (SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) {
super.beforeAll()
SparkConnectServerUtils.start()
spark = SparkSession
.builder()
.client(SparkConnectClient.builder().port(serverPort).build())
.create()

// Retry and wait for the server to start
val stop = System.nanoTime() + TimeUnit.MINUTES.toNanos(1) // ~1 min
var sleepInternalMs = TimeUnit.SECONDS.toMillis(1) // 1s with * 2 backoff
var success = false
val error = new RuntimeException(s"Failed to start the test server on port $serverPort.")

while (!success && System.nanoTime() < stop) {
try {
// Run a simple query to verify the server is really up and ready
val result = spark
.sql("select val from (values ('Hello'), ('World')) as t(val)")
.collect()
assert(result.length == 2)
success = true
debug("Spark Connect Server is up.")
} catch {
// ignored the error
case e: Throwable =>
error.addSuppressed(e)
Thread.sleep(sleepInternalMs)
sleepInternalMs *= 2
}
}
}

// Throw error if failed
if (!success) {
debug(error)
throw error
// Throw error if failed
if (!success) {
debug(error)
throw error
}
}
}

Expand All @@ -217,4 +222,17 @@ trait RemoteSparkSession extends ConnectFunSuite with BeforeAndAfterAll {
spark = null
super.afterAll()
}

/**
* SPARK-44259: override test function to skip `RemoteSparkSession-based` tests as default, we
* should delete this function after SPARK-44121 is completed.
*/
override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
pos: Position): Unit = {
super.test(testName, testTags: _*) {
// TODO(SPARK-44121) Re-enable Arrow-based connect tests in Java 21
assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17))
testFun
}
}
}