From cdc9630a78f007e549bb9e48cb587ec7679a4265 Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Fri, 31 Mar 2023 16:00:38 +0800 Subject: [PATCH] shutdown Py4JServer --- .../engine/spark/session/SparkSQLSessionManager.scala | 2 ++ .../spark/api/python/KyuubiPythonGatewayServer.scala | 11 ++++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala index 677af9a0394..79f38ce35a4 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala @@ -20,6 +20,7 @@ package org.apache.kyuubi.engine.spark.session import java.util.concurrent.{ScheduledExecutorService, TimeUnit} import org.apache.hive.service.rpc.thrift.TProtocolVersion +import org.apache.spark.api.python.KyuubiPythonGatewayServer import org.apache.spark.sql.SparkSession import org.apache.kyuubi.KyuubiSQLException @@ -94,6 +95,7 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession) override def stop(): Unit = { super.stop() + KyuubiPythonGatewayServer.shutdown() userIsolatedSparkSessionThread.foreach(_.shutdown()) } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/api/python/KyuubiPythonGatewayServer.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/api/python/KyuubiPythonGatewayServer.scala index 7e15ffe05a6..b7c262b3724 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/api/python/KyuubiPythonGatewayServer.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/api/python/KyuubiPythonGatewayServer.scala @@ -30,10 +30,12 @@ object KyuubiPythonGatewayServer extends Logging { val CONNECTION_FILE_PATH = Utils.createTempDir() + "/connection.info" + private var gatewayServer: Py4JServer = _ + def start(): Unit = { val sparkConf = new SparkConf() - val gatewayServer: Py4JServer = new Py4JServer(sparkConf) + gatewayServer = new Py4JServer(sparkConf) gatewayServer.start() val boundPort: Int = gatewayServer.getListeningPort @@ -65,4 +67,11 @@ object KyuubiPythonGatewayServer extends Logging { System.exit(1) } } + + def shutdown(): Unit = { + if (gatewayServer != null) { + logInfo("shutting down the python gateway server.") + gatewayServer.shutdown() + } + } }