From b623755a7ca35e19311d984569b9498f09565d3a Mon Sep 17 00:00:00 2001 From: Bowen Liang Date: Fri, 24 Nov 2023 13:04:40 +0800 Subject: [PATCH] config option --- .../flink/WithFlinkSQLEngineLocal.scala | 13 +++-- .../flink/WithFlinkSQLEngineOnYarn.scala | 6 +-- .../HiveCatalogDatabaseOperationSuite.scala | 5 +- .../hive/operation/HiveOperationSuite.scala | 3 +- .../main/scala/org/apache/kyuubi/Utils.scala | 7 +-- .../scala/org/apache/kyuubi/UtilsSuite.scala | 13 ++--- .../engine/chat/ChatProcessBuilder.scala | 19 +++---- .../engine/flink/FlinkProcessBuilder.scala | 29 ++++------ .../engine/hive/HiveProcessBuilder.scala | 21 +++----- .../engine/jdbc/JdbcProcessBuilder.scala | 19 +++---- .../spark/SparkBatchProcessBuilder.scala | 9 ++-- .../engine/spark/SparkProcessBuilder.scala | 11 ++-- .../engine/trino/TrinoProcessBuilder.scala | 19 +++---- .../flink/FlinkProcessBuilderSuite.scala | 3 +- .../spark/SparkProcessBuilderSuite.scala | 1 + .../util/command/CommandLineUtils.scala | 54 +++++++++++++++++++ .../util/command/CommandUtilsSuite.scala | 47 ++++++++++++++++ 17 files changed, 177 insertions(+), 102 deletions(-) create mode 100644 kyuubi-util-scala/src/main/scala/org/apache/kyuubi/util/command/CommandLineUtils.scala create mode 100644 kyuubi-util-scala/src/test/scala/org/apache/kyuubi/util/command/CommandUtilsSuite.scala diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala index ccaefb496b0..54685a0b382 100644 --- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala +++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala @@ -23,6 +23,7 @@ import java.net.URI import java.nio.file.{Files, Paths} import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import org.apache.flink.configuration.{Configuration, RestOptions} @@ -32,6 +33,7 @@ import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiException, KyuubiFunSuite, SCALA import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ADDRESSES +import org.apache.kyuubi.util.command.CommandLineUtils._ import org.apache.kyuubi.zookeeper.EmbeddedZookeeper import org.apache.kyuubi.zookeeper.ZookeeperConf.{ZK_CLIENT_PORT, ZK_CLIENT_PORT_ADDRESS} @@ -122,8 +124,7 @@ trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with WithFlinkTestResources command += javaOptions.get } - command += "-cp" - val classpathEntries = new java.util.LinkedHashSet[String] + val classpathEntries = new mutable.LinkedHashSet[String] // flink engine runtime jar mainResource(envs).foreach(classpathEntries.add) // flink sql jars @@ -163,13 +164,11 @@ trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with WithFlinkTestResources classpathEntries.add(s"$devHadoopJars${File.separator}*") } } - command += classpathEntries.asScala.mkString(File.pathSeparator) + command ++= genClasspathOption(classpathEntries) + command += "org.apache.kyuubi.engine.flink.FlinkSQLEngine" - conf.getAll.foreach { case (k, v) => - command += "--conf" - command += s"$k=$v" - } + command ++= confKeyValues(conf.getAll) processBuilder.command(command.toList.asJava) processBuilder.redirectOutput(Redirect.INHERIT) diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineOnYarn.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineOnYarn.scala index 49fb947a3ec..730a2646bed 100644 --- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineOnYarn.scala +++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineOnYarn.scala @@ -34,6 +34,7 @@ import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite, SCALA_COMPILE_VERSION, import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf.{ENGINE_FLINK_APPLICATION_JARS, KYUUBI_HOME} import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ADDRESSES +import org.apache.kyuubi.util.command.CommandLineUtils._ import org.apache.kyuubi.zookeeper.EmbeddedZookeeper import org.apache.kyuubi.zookeeper.ZookeeperConf.{ZK_CLIENT_PORT, ZK_CLIENT_PORT_ADDRESS} @@ -179,10 +180,7 @@ trait WithFlinkSQLEngineOnYarn extends KyuubiFunSuite with WithFlinkTestResource conf.set(k, v) } - for ((k, v) <- conf.getAll) { - command += "--conf" - command += s"$k=$v" - } + command ++= confKeyValues(conf.getAll) processBuilder.command(command.toList.asJava) processBuilder.redirectOutput(Redirect.INHERIT) diff --git a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveCatalogDatabaseOperationSuite.scala b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveCatalogDatabaseOperationSuite.scala index a63de20c7de..7db2d7fdca3 100644 --- a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveCatalogDatabaseOperationSuite.scala +++ b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveCatalogDatabaseOperationSuite.scala @@ -23,6 +23,7 @@ import org.apache.kyuubi.Utils import org.apache.kyuubi.config.KyuubiConf.ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED import org.apache.kyuubi.engine.hive.HiveSQLEngine import org.apache.kyuubi.operation.HiveJDBCTestHelper +import org.apache.kyuubi.util.command.CommandLineUtils._ class HiveCatalogDatabaseOperationSuite extends HiveJDBCTestHelper { @@ -30,9 +31,9 @@ class HiveCatalogDatabaseOperationSuite extends HiveJDBCTestHelper { val metastore = Utils.createTempDir(prefix = getClass.getSimpleName) metastore.toFile.delete() val args = Array( - "--conf", + CONF, s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$metastore;create=true", - "--conf", + CONF, s"${ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED.key}=true") HiveSQLEngine.main(args) super.beforeAll() diff --git a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala index eb10e0b4144..53cc9457ae1 100644 --- a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala +++ b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala @@ -22,6 +22,7 @@ import org.apache.commons.lang3.{JavaVersion, SystemUtils} import org.apache.kyuubi.{HiveEngineTests, KYUUBI_VERSION, Utils} import org.apache.kyuubi.engine.hive.HiveSQLEngine import org.apache.kyuubi.jdbc.hive.KyuubiStatement +import org.apache.kyuubi.util.command.CommandLineUtils._ class HiveOperationSuite extends HiveEngineTests { @@ -29,7 +30,7 @@ class HiveOperationSuite extends HiveEngineTests { val metastore = Utils.createTempDir(prefix = getClass.getSimpleName) metastore.toFile.delete() val args = Array( - "--conf", + CONF, s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$metastore;create=true") HiveSQLEngine.main(args) super.beforeAll() diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala index accfca4c98f..462f0ebd77d 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala @@ -40,6 +40,7 @@ import org.apache.hadoop.util.ShutdownHookManager import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.internal.Tests.IS_TESTING +import org.apache.kyuubi.util.command.CommandLineUtils._ object Utils extends Logging { @@ -325,7 +326,7 @@ object Utils extends Logging { require(args.length % 2 == 0, s"Illegal size of arguments.") for (i <- args.indices by 2) { require( - args(i) == "--conf", + args(i) == CONF, s"Unrecognized main arguments prefix ${args(i)}," + s"the argument format is '--conf k=v'.") @@ -347,9 +348,9 @@ object Utils extends Logging { case PATTERN_FOR_KEY_VALUE_ARG(key, value) if nextKV => val (_, newValue) = redact(redactionPattern, Seq((key, value))).head nextKV = false - s"$key=$newValue" + getKeyValuePair(key, newValue) - case cmd if cmd == "--conf" => + case cmd if cmd == CONF => nextKV = true cmd diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/UtilsSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/UtilsSuite.scala index 5973fc6e7a6..2a49cc708d4 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/UtilsSuite.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/UtilsSuite.scala @@ -29,6 +29,7 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf.SERVER_SECRET_REDACTION_PATTERN +import org.apache.kyuubi.util.command.CommandLineUtils._ class UtilsSuite extends KyuubiFunSuite { @@ -158,14 +159,10 @@ class UtilsSuite extends KyuubiFunSuite { val buffer = new ArrayBuffer[String]() buffer += "main" - buffer += "--conf" - buffer += "kyuubi.my.password=sensitive_value" - buffer += "--conf" - buffer += "kyuubi.regular.property1=regular_value" - buffer += "--conf" - buffer += "kyuubi.my.secret=sensitive_value" - buffer += "--conf" - buffer += "kyuubi.regular.property2=regular_value" + buffer ++= confKeyValue("kyuubi.my.password", "sensitive_value") + buffer ++= confKeyValue("kyuubi.regular.property1", "regular_value") + buffer ++= confKeyValue("kyuubi.my.secret", "sensitive_value") + buffer ++= confKeyValue("kyuubi.regular.property2", "regular_value") val commands = buffer.toArray diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/chat/ChatProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/chat/ChatProcessBuilder.scala index 121a20f5f23..73cba650e6c 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/chat/ChatProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/chat/ChatProcessBuilder.scala @@ -19,9 +19,8 @@ package org.apache.kyuubi.engine.chat import java.io.File import java.nio.file.{Files, Paths} -import java.util -import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import com.google.common.annotations.VisibleForTesting @@ -33,6 +32,7 @@ import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY import org.apache.kyuubi.engine.ProcBuilder import org.apache.kyuubi.operation.log.OperationLog +import org.apache.kyuubi.util.command.CommandLineUtils._ class ChatProcessBuilder( override val proxyUser: String, @@ -69,8 +69,7 @@ class ChatProcessBuilder( val javaOptions = conf.get(ENGINE_CHAT_JAVA_OPTIONS) javaOptions.foreach(buffer += _) - buffer += "-cp" - val classpathEntries = new util.LinkedHashSet[String] + val classpathEntries = new mutable.LinkedHashSet[String] mainResource.foreach(classpathEntries.add) mainResource.foreach { path => val parent = Paths.get(path).getParent @@ -88,16 +87,14 @@ class ChatProcessBuilder( val extraCp = conf.get(ENGINE_CHAT_EXTRA_CLASSPATH) extraCp.foreach(classpathEntries.add) - buffer += classpathEntries.asScala.mkString(File.pathSeparator) + buffer ++= genClasspathOption(classpathEntries) + buffer += mainClass - buffer += "--conf" - buffer += s"$KYUUBI_SESSION_USER_KEY=$proxyUser" + buffer ++= confKeyValue(KYUUBI_SESSION_USER_KEY, proxyUser) + + buffer ++= confKeyValues(conf.getAll) - conf.getAll.foreach { case (k, v) => - buffer += "--conf" - buffer += s"$k=$v" - } buffer.toArray } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala index f43adfbc216..2e51e46be4c 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala @@ -20,7 +20,7 @@ package org.apache.kyuubi.engine.flink import java.io.{File, FilenameFilter} import java.nio.file.{Files, Paths} -import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, ListBuffer} import com.google.common.annotations.VisibleForTesting @@ -32,6 +32,7 @@ import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY import org.apache.kyuubi.engine.{ApplicationManagerInfo, KyuubiApplicationManager, ProcBuilder} import org.apache.kyuubi.engine.flink.FlinkProcessBuilder._ import org.apache.kyuubi.operation.log.OperationLog +import org.apache.kyuubi.util.command.CommandLineUtils._ /** * A builder to build flink sql engine progress. @@ -134,14 +135,9 @@ class FlinkProcessBuilder( buffer += s"$mainClass" buffer += s"${mainResource.get}" - buffer += "--conf" - buffer += s"$KYUUBI_SESSION_USER_KEY=$proxyUser" - conf.getAll.foreach { case (k, v) => - if (k.startsWith("kyuubi.")) { - buffer += "--conf" - buffer += s"$k=$v" - } - } + buffer ++= confKeyValue(KYUUBI_SESSION_USER_KEY, proxyUser) + + buffer ++= confKeyValues(conf.getAll.filter(_._1.startsWith("kyuubi."))) buffer.toArray @@ -156,8 +152,7 @@ class FlinkProcessBuilder( buffer += javaOptions.get } - buffer += "-cp" - val classpathEntries = new java.util.LinkedHashSet[String] + val classpathEntries = new mutable.LinkedHashSet[String] // flink engine runtime jar mainResource.foreach(classpathEntries.add) // flink sql jars @@ -201,16 +196,14 @@ class FlinkProcessBuilder( classpathEntries.add(s"$devHadoopJars${File.separator}*") } } - buffer += classpathEntries.asScala.mkString(File.pathSeparator) + buffer ++= genClasspathOption(classpathEntries) + buffer += mainClass - buffer += "--conf" - buffer += s"$KYUUBI_SESSION_USER_KEY=$proxyUser" + buffer ++= confKeyValue(KYUUBI_SESSION_USER_KEY, proxyUser) + + buffer ++= confKeyValues(conf.getAll) - conf.getAll.foreach { case (k, v) => - buffer += "--conf" - buffer += s"$k=$v" - } buffer.toArray } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala index 3325d5f2e02..217beb28b45 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala @@ -19,9 +19,8 @@ package org.apache.kyuubi.engine.hive import java.io.File import java.nio.file.{Files, Paths} -import java.util -import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import com.google.common.annotations.VisibleForTesting @@ -33,6 +32,7 @@ import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_ID, KYUUBI_SES import org.apache.kyuubi.engine.{KyuubiApplicationManager, ProcBuilder} import org.apache.kyuubi.engine.hive.HiveProcessBuilder._ import org.apache.kyuubi.operation.log.OperationLog +import org.apache.kyuubi.util.command.CommandLineUtils._ class HiveProcessBuilder( override val proxyUser: String, @@ -65,8 +65,7 @@ class HiveProcessBuilder( } // -Xmx5g // java options - buffer += "-cp" - val classpathEntries = new util.LinkedHashSet[String] + val classpathEntries = new mutable.LinkedHashSet[String] // hive engine runtime jar mainResource.foreach(classpathEntries.add) // classpath contains hive configurations, default to hive.home/conf @@ -101,18 +100,14 @@ class HiveProcessBuilder( classpathEntries.add(s"$devHadoopJars${File.separator}*") } } - buffer += classpathEntries.asScala.mkString(File.pathSeparator) + buffer ++= genClasspathOption(classpathEntries) buffer += mainClass - buffer += "--conf" - buffer += s"$KYUUBI_SESSION_USER_KEY=$proxyUser" - buffer += "--conf" - buffer += s"$KYUUBI_ENGINE_ID=$engineRefId" + buffer ++= confKeyValue(KYUUBI_SESSION_USER_KEY, proxyUser) + buffer ++= confKeyValue(KYUUBI_ENGINE_ID, engineRefId) + + buffer ++= confKeyValues(conf.getAll) - for ((k, v) <- conf.getAll) { - buffer += "--conf" - buffer += s"$k=$v" - } buffer.toArray } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/jdbc/JdbcProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/jdbc/JdbcProcessBuilder.scala index 3849c6431e9..72e80433ec2 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/jdbc/JdbcProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/jdbc/JdbcProcessBuilder.scala @@ -19,9 +19,8 @@ package org.apache.kyuubi.engine.jdbc import java.io.File import java.nio.file.Paths -import java.util -import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import com.google.common.annotations.VisibleForTesting @@ -33,6 +32,7 @@ import org.apache.kyuubi.config.KyuubiConf.{ENGINE_JDBC_CONNECTION_PASSWORD, ENG import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY import org.apache.kyuubi.engine.ProcBuilder import org.apache.kyuubi.operation.log.OperationLog +import org.apache.kyuubi.util.command.CommandLineUtils._ class JdbcProcessBuilder( override val proxyUser: String, @@ -72,8 +72,7 @@ class JdbcProcessBuilder( val javaOptions = conf.get(ENGINE_JDBC_JAVA_OPTIONS) javaOptions.foreach(buffer += _) - buffer += "-cp" - val classpathEntries = new util.LinkedHashSet[String] + val classpathEntries = new mutable.LinkedHashSet[String] mainResource.foreach(classpathEntries.add) mainResource.foreach { path => val parent = Paths.get(path).getParent @@ -91,16 +90,14 @@ class JdbcProcessBuilder( val extraCp = conf.get(ENGINE_JDBC_EXTRA_CLASSPATH) extraCp.foreach(classpathEntries.add) - buffer += classpathEntries.asScala.mkString(File.pathSeparator) + buffer ++= genClasspathOption(classpathEntries) + buffer += mainClass - buffer += "--conf" - buffer += s"$KYUUBI_SESSION_USER_KEY=$proxyUser" + buffer ++= confKeyValue(KYUUBI_SESSION_USER_KEY, proxyUser) + + buffer ++= confKeyValues(conf.getAll) - for ((k, v) <- conf.getAll) { - buffer += "--conf" - buffer += s"$k=$v" - } buffer.toArray } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala index ef159bb93ad..a950c504dc0 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala @@ -22,6 +22,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.engine.KyuubiApplicationManager import org.apache.kyuubi.operation.log.OperationLog +import org.apache.kyuubi.util.command.CommandLineUtils._ class SparkBatchProcessBuilder( override val proxyUser: String, @@ -51,13 +52,11 @@ class SparkBatchProcessBuilder( // tag batch application KyuubiApplicationManager.tagApplication(batchId, "spark", clusterManager(), batchKyuubiConf) - (batchKyuubiConf.getAll ++ + val allConfigs = batchKyuubiConf.getAll ++ sparkAppNameConf() ++ engineLogPathConf() ++ - appendPodNameConf(batchConf)).foreach { case (k, v) => - buffer += CONF - buffer += s"${convertConfigKey(k)}=$v" - } + appendPodNameConf(batchConf) + buffer ++= confKeyValues(allConfigs) setupKerberos(buffer) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala index 086ca057de8..15c2282e47a 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala @@ -38,6 +38,7 @@ import org.apache.kyuubi.ha.HighAvailabilityConf import org.apache.kyuubi.ha.client.AuthTypes import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.util.{KubernetesUtils, Validator} +import org.apache.kyuubi.util.command.CommandLineUtils._ class SparkProcessBuilder( override val proxyUser: String, @@ -141,8 +142,7 @@ class SparkProcessBuilder( } // pass spark engine log path to spark conf (allConf ++ engineLogPathConf ++ appendPodNameConf(allConf)).foreach { case (k, v) => - buffer += CONF - buffer += s"${convertConfigKey(k)}=$v" + buffer ++= confKeyValue(convertConfigKey(k), v) } setupKerberos(buffer) @@ -289,10 +289,8 @@ class SparkProcessBuilder( def setSparkUserName(userName: String, buffer: ArrayBuffer[String]): Unit = { clusterManager().foreach { cm => if (cm.toUpperCase.startsWith("K8S")) { - buffer += CONF - buffer += s"spark.kubernetes.driverEnv.SPARK_USER_NAME=$userName" - buffer += CONF - buffer += s"spark.executorEnv.SPARK_USER_NAME=$userName" + buffer ++= confKeyValue("spark.kubernetes.driverEnv.SPARK_USER_NAME", userName) + buffer ++= confKeyValue("spark.executorEnv.SPARK_USER_NAME", userName) } } } @@ -335,7 +333,6 @@ object SparkProcessBuilder { "spark.kubernetes.kerberos.krb5.path", "spark.kubernetes.file.upload.path") - final private[spark] val CONF = "--conf" final private[spark] val CLASS = "--class" final private[spark] val PROXY_USER = "--proxy-user" final private[spark] val SPARK_FILES = "spark.files" diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProcessBuilder.scala index 5b755dec5a1..c1b94928862 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProcessBuilder.scala @@ -19,9 +19,8 @@ package org.apache.kyuubi.engine.trino import java.io.File import java.nio.file.Paths -import java.util -import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import com.google.common.annotations.VisibleForTesting @@ -33,6 +32,7 @@ import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY import org.apache.kyuubi.engine.{KyuubiApplicationManager, ProcBuilder} import org.apache.kyuubi.operation.log.OperationLog +import org.apache.kyuubi.util.command.CommandLineUtils._ class TrinoProcessBuilder( override val proxyUser: String, @@ -68,8 +68,7 @@ class TrinoProcessBuilder( buffer += javaOptions.get } - buffer += "-cp" - val classpathEntries = new util.LinkedHashSet[String] + val classpathEntries = new mutable.LinkedHashSet[String] // trino engine runtime jar mainResource.foreach(classpathEntries.add) @@ -90,20 +89,18 @@ class TrinoProcessBuilder( val extraCp = conf.get(ENGINE_TRINO_EXTRA_CLASSPATH) extraCp.foreach(classpathEntries.add) - buffer += classpathEntries.asScala.mkString(File.pathSeparator) + buffer ++= genClasspathOption(classpathEntries) + buffer += mainClass // TODO: How shall we deal with proxyUser, // user.name // kyuubi.session.user // or just leave it, because we can handle it at operation layer - buffer += "--conf" - buffer += s"$KYUUBI_SESSION_USER_KEY=$proxyUser" + buffer ++= confKeyValue(KYUUBI_SESSION_USER_KEY, proxyUser) + + buffer ++= confKeyValues(conf.getAll) - for ((k, v) <- conf.getAll) { - buffer += "--conf" - buffer += s"$k=$v" - } buffer.toArray } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala index 84be010ed4b..44cbdeecbc0 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala @@ -29,6 +29,7 @@ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf.{ENGINE_FLINK_APPLICATION_JARS, ENGINE_FLINK_EXTRA_CLASSPATH, ENGINE_FLINK_JAVA_OPTIONS, ENGINE_FLINK_MEMORY} import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY import org.apache.kyuubi.engine.flink.FlinkProcessBuilder._ +import org.apache.kyuubi.util.command.CommandLineUtils._ class FlinkProcessBuilderSuite extends KyuubiFunSuite { private def sessionModeConf = KyuubiConf() @@ -73,7 +74,7 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite { private def confStr: String = { sessionModeConf.clone.getAll .filter(!_._1.equals(KYUUBI_ENGINE_CREDENTIALS_KEY)) - .map { case (k, v) => s"\\\\\\n\\t--conf $k=$v" } + .map { case (k, v) => s"\\\\\\n\\t${confKeyValueStr(k, v)}" } .mkString(" ") } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala index 7bbe4ad0670..a26179177f9 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala @@ -35,6 +35,7 @@ import org.apache.kyuubi.ha.HighAvailabilityConf import org.apache.kyuubi.ha.client.AuthTypes import org.apache.kyuubi.service.ServiceUtils import org.apache.kyuubi.util.AssertionUtils._ +import org.apache.kyuubi.util.command.CommandLineUtils._ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar { private def conf = KyuubiConf().set("kyuubi.on", "off") diff --git a/kyuubi-util-scala/src/main/scala/org/apache/kyuubi/util/command/CommandLineUtils.scala b/kyuubi-util-scala/src/main/scala/org/apache/kyuubi/util/command/CommandLineUtils.scala new file mode 100644 index 00000000000..0f00b1049ef --- /dev/null +++ b/kyuubi-util-scala/src/main/scala/org/apache/kyuubi/util/command/CommandLineUtils.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.util.command + +import java.io.File + +object CommandLineUtils { + val CONF = "--conf" + val JAVA_CLASSPATH = "-cp" + + /** + * Assemble key value pair with "=" seperator + */ + def getKeyValuePair(key: String, value: String): String = s"$key=$value".trim + + /** + * Assemble key value pair with config option prefix + * + * @param key + * @param value + * @return + */ + def confKeyValue(key: String, value: String, confOption: String = CONF): Traversable[String] = + Stream(confOption, getKeyValuePair(key, value)) + + def confKeyValueStr(key: String, value: String, confOption: String = CONF): String = + confKeyValue(key, value, confOption).mkString(" ") + + def confKeyValues(configs: Iterable[(String, String)]): Traversable[String] = + configs.flatMap { case (k, v) => confKeyValue(k, v) }.toStream + + /** + * Generate classpath option by assembling the classpath entries with "-cp" prefix + * @param classpathEntries + * @return + */ + def genClasspathOption(classpathEntries: Iterable[String]): Traversable[String] = + Stream(JAVA_CLASSPATH, classpathEntries.mkString(File.pathSeparator)) +} diff --git a/kyuubi-util-scala/src/test/scala/org/apache/kyuubi/util/command/CommandUtilsSuite.scala b/kyuubi-util-scala/src/test/scala/org/apache/kyuubi/util/command/CommandUtilsSuite.scala new file mode 100644 index 00000000000..3dbdec2cc15 --- /dev/null +++ b/kyuubi-util-scala/src/test/scala/org/apache/kyuubi/util/command/CommandUtilsSuite.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.util.command + +// scalastyle:off +import org.scalatest.funsuite.AnyFunSuite + +// scalastyle:on +import org.apache.kyuubi.util.command.CommandLineUtils._ + +// scalastyle:off +class CommandUtilsSuite extends AnyFunSuite { +// scalastyle:on + + test("assemble key value pair") { + assertResult("abc=123")(getKeyValuePair("abc", "123")) + assertResult("abc=123")(getKeyValuePair(" abc", "123 ")) + assertResult("abc.def=xyz.123")(getKeyValuePair("abc.def", "xyz.123")) + } + + test("assemble key value pair with config option") { + assertResult("--conf abc=123")(confKeyValueStr("abc", "123")) + assertResult("--conf abc.def=xyz.123")(confKeyValueStr("abc.def", "xyz.123")) + + assertResult(Seq("--conf", "abc=123"))(confKeyValue("abc", "123")) + assertResult(Seq("--conf", "abc.def=xyz.123"))(confKeyValue("abc.def", "xyz.123")) + } + + test("assemble classpath options") { + assertResult(Seq("-cp", "/path/a.jar:/path2/b*.jar"))( + genClasspathOption(Seq("/path/a.jar", "/path2/b*.jar"))) + } +}