From fc44bca6c9538edd664e6895c30b38adafdf88a5 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Sun, 2 Jul 2023 12:59:52 +0800 Subject: [PATCH 1/4] test java 21 connect --- .github/workflows/build_and_test.yml | 2 +- .../spark/sql/application/ReplE2ESuite.scala | 40 ++++++++++--------- 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 85e477efd4e13..aabfb4f85b619 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -25,7 +25,7 @@ on: java: required: false type: string - default: 8 + default: 21-ea branch: description: Branch to run the build against required: false diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala index 61959234c8790..7501a90415467 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala @@ -20,6 +20,7 @@ import java.io.{PipedInputStream, PipedOutputStream} import java.util.concurrent.{Executors, Semaphore, TimeUnit} import org.apache.commons.io.output.ByteArrayOutputStream +import org.apache.commons.lang3.{JavaVersion, SystemUtils} import org.scalatest.BeforeAndAfterEach import org.apache.spark.sql.connect.client.util.RemoteSparkSession @@ -42,26 +43,29 @@ class ReplE2ESuite extends RemoteSparkSession with BeforeAndAfterEach { } override def beforeAll(): Unit = { - super.beforeAll() - ammoniteOut = new ByteArrayOutputStream() - testSuiteOut = new PipedOutputStream() - // Connect the `testSuiteOut` and `ammoniteIn` pipes - ammoniteIn = new PipedInputStream(testSuiteOut) - errorStream = new ByteArrayOutputStream() - - val args = Array("--port", serverPort.toString) - val task = new Runnable { - override def run(): Unit = { - ConnectRepl.doMain( - args = args, - semaphore = Some(semaphore), - inputStream = ammoniteIn, - outputStream = ammoniteOut, - errorStream = errorStream) + // TODO(SPARK-44121) Remove this check condition + if (SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) { + super.beforeAll() + ammoniteOut = new ByteArrayOutputStream() + testSuiteOut = new PipedOutputStream() + // Connect the `testSuiteOut` and `ammoniteIn` pipes + ammoniteIn = new PipedInputStream(testSuiteOut) + errorStream = new ByteArrayOutputStream() + + val args = Array("--port", serverPort.toString) + val task = new Runnable { + override def run(): Unit = { + ConnectRepl.doMain( + args = args, + semaphore = Some(semaphore), + inputStream = ammoniteIn, + outputStream = ammoniteOut, + errorStream = errorStream) + } } - } - executorService.submit(task) + executorService.submit(task) + } } override def afterAll(): Unit = { From 1ed1b8cce89f2d9c7d9302ccff6a05290933377e Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Sun, 2 Jul 2023 14:54:42 +0800 Subject: [PATCH 2/4] streaming --- .../org/apache/spark/streaming/util/WriteAheadLogSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 60e04403937a2..e898f8bf0564e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -476,7 +476,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests( val batchedWal = new BatchedWriteAheadLog(wal, sparkConf) val e = intercept[SparkException] { - val buffer = mock[ByteBuffer] + val buffer = ByteBuffer.allocate(1) batchedWal.write(buffer, 2L) } assert(e.getCause.getMessage === "Hello!") @@ -546,7 +546,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests( batchedWal.close() verify(wal, times(1)).close() - intercept[IllegalStateException](batchedWal.write(mock[ByteBuffer], 12L)) + intercept[IllegalStateException](batchedWal.write(ByteBuffer.allocate(1), 12L)) } test("BatchedWriteAheadLog - fail everything in queue during shutdown") { From 503e216d0037f541dd7a71e0b2bd4fdf59ead487 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 3 Jul 2023 13:23:43 +0800 Subject: [PATCH 3/4] revert build_and_test.yml --- .github/workflows/build_and_test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index aabfb4f85b619..85e477efd4e13 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -25,7 +25,7 @@ on: java: required: false type: string - default: 21-ea + default: 8 branch: description: Branch to run the build against required: false From d0a0a6af53c91663568dd13e36300c5f67a51357 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 5 Jul 2023 10:06:23 +0800 Subject: [PATCH 4/4] for ga