Skip to content

Commit

Permalink
[SPARK-44259][CONNECT][TESTS] Make connect-client-jvm pass on Java …
Browse files Browse the repository at this point in the history
…21 except `RemoteSparkSession`-based tests

### What changes were proposed in this pull request?
This pr ignore all tests inherit `RemoteSparkSession` as default for Java 21 by override the `test` function in `RemoteSparkSession`,  they are all arrow-based tests due to the use of arrow data format for rpc communication in connect.

```
23/06/30 11:45:41 ERROR SparkConnectService: Error during: execute. UserId: . SessionId: e7479b73-d02c-47e9-85c8-40b3e9315561.
java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
	at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
	at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
	at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
	at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
	at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
	at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
	at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:237)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchWithSchemaIterator.$anonfun$next$3(ArrowConverters.scala:174)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1487)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchWithSchemaIterator.next(ArrowConverters.scala:181)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchWithSchemaIterator.next(ArrowConverters.scala:128)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.sql.connect.service.SparkConnectStreamHandler$.processAsArrowBatches(SparkConnectStreamHandler.scala:178)
	at org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handlePlan(SparkConnectStreamHandler.scala:104)
	at org.apache.spark.sql.connect.service.SparkConnectStreamHandler.$anonfun$handle$1(SparkConnectStreamHandler.scala:86)
	at org.apache.spark.sql.connect.service.SparkConnectStreamHandler.$anonfun$handle$1$adapted(SparkConnectStreamHandler.scala:53)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$3(SessionHolder.scala:152)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:857)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:152)
	at org.apache.spark.JobArtifactSet$.withActive(JobArtifactSet.scala:109)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withContext$1(SessionHolder.scala:122)
	at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:209)
	at org.apache.spark.sql.connect.service.SessionHolder.withContext(SessionHolder.scala:121)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:151)
	at org.apache.spark.sql.connect.service.SessionHolder.withSessionBasedPythonPaths(SessionHolder.scala:137)
	at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:150)
	at org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handle(SparkConnectStreamHandler.scala:53)
	at org.apache.spark.sql.connect.service.SparkConnectService.executePlan(SparkConnectService.scala:166)
	at org.apache.spark.connect.proto.SparkConnectServiceGrpc$MethodHandlers.invoke(SparkConnectServiceGrpc.java:584)
	at org.sparkproject.connect.grpc.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
	at org.sparkproject.connect.grpc.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:346)
	at org.sparkproject.connect.grpc.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:860)
	at org.sparkproject.connect.grpc.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at org.sparkproject.connect.grpc.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
```

All ignored test related to apache/arrow#35053, so we should wait for upgrading to a new arrow version  and re-enable them for Java 21,  the following TODO JIRA is created for that.

- Reenable Arrow-based connect tests in Java 21:  https://issues.apache.org/jira/browse/SPARK-44121

### Why are the changes needed?
Make Java 21 daily test can monitor other non-arrow based tests.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- Pass GitHub Actions
- manually tests with Java 21:

```
java -version
openjdk version "21-ea" 2023-09-19
OpenJDK Runtime Environment Zulu21+65-CA (build 21-ea+26)
OpenJDK 64-Bit Server VM Zulu21+65-CA (build 21-ea+26, mixed mode, sharing)
```

```
build/sbt "connect-client-jvm/test" -Phive
```

```
[info] Run completed in 4 seconds, 640 milliseconds.
[info] Total number of tests run: 846
[info] Suites: completed 22, aborted 0
[info] Tests: succeeded 846, failed 0, canceled 167, ignored 1, pending 0
[info] All tests passed.
```

Closes #41805 from LuciferYang/SPARK-44259.

Authored-by: yangjie01 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
LuciferYang authored and dongjoon-hyun committed Jul 1, 2023
1 parent 6d8d11b commit 8c635a0
Showing 1 changed file with 52 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import java.util.concurrent.TimeUnit

import scala.io.Source

import org.scalatest.BeforeAndAfterAll
import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.scalactic.source.Position
import org.scalatest.{BeforeAndAfterAll, Tag}
import sys.process._

import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -170,41 +172,44 @@ trait RemoteSparkSession extends ConnectFunSuite with BeforeAndAfterAll {
protected lazy val serverPort: Int = port

override def beforeAll(): Unit = {
super.beforeAll()
SparkConnectServerUtils.start()
spark = SparkSession
.builder()
.client(SparkConnectClient.builder().port(serverPort).build())
.create()

// Retry and wait for the server to start
val stop = System.nanoTime() + TimeUnit.MINUTES.toNanos(1) // ~1 min
var sleepInternalMs = TimeUnit.SECONDS.toMillis(1) // 1s with * 2 backoff
var success = false
val error = new RuntimeException(s"Failed to start the test server on port $serverPort.")

while (!success && System.nanoTime() < stop) {
try {
// Run a simple query to verify the server is really up and ready
val result = spark
.sql("select val from (values ('Hello'), ('World')) as t(val)")
.collect()
assert(result.length == 2)
success = true
debug("Spark Connect Server is up.")
} catch {
// ignored the error
case e: Throwable =>
error.addSuppressed(e)
Thread.sleep(sleepInternalMs)
sleepInternalMs *= 2
// TODO(SPARK-44121) Remove this check condition
if (SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) {
super.beforeAll()
SparkConnectServerUtils.start()
spark = SparkSession
.builder()
.client(SparkConnectClient.builder().port(serverPort).build())
.create()

// Retry and wait for the server to start
val stop = System.nanoTime() + TimeUnit.MINUTES.toNanos(1) // ~1 min
var sleepInternalMs = TimeUnit.SECONDS.toMillis(1) // 1s with * 2 backoff
var success = false
val error = new RuntimeException(s"Failed to start the test server on port $serverPort.")

while (!success && System.nanoTime() < stop) {
try {
// Run a simple query to verify the server is really up and ready
val result = spark
.sql("select val from (values ('Hello'), ('World')) as t(val)")
.collect()
assert(result.length == 2)
success = true
debug("Spark Connect Server is up.")
} catch {
// ignored the error
case e: Throwable =>
error.addSuppressed(e)
Thread.sleep(sleepInternalMs)
sleepInternalMs *= 2
}
}
}

// Throw error if failed
if (!success) {
debug(error)
throw error
// Throw error if failed
if (!success) {
debug(error)
throw error
}
}
}

Expand All @@ -217,4 +222,17 @@ trait RemoteSparkSession extends ConnectFunSuite with BeforeAndAfterAll {
spark = null
super.afterAll()
}

/**
* SPARK-44259: override test function to skip `RemoteSparkSession-based` tests as default, we
* should delete this function after SPARK-44121 is completed.
*/
override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
pos: Position): Unit = {
super.test(testName, testTags: _*) {
// TODO(SPARK-44121) Re-enable Arrow-based connect tests in Java 21
assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17))
testFun
}
}
}

0 comments on commit 8c635a0

Please sign in to comment.