From ca861fea21adc4e6ec95eced7076cb27fc86ea18 Mon Sep 17 00:00:00 2001 From: liuxian Date: Wed, 5 Sep 2018 10:43:46 +0800 Subject: [PATCH] [SPARK-25300][CORE] Unified the configuration parameter `spark.shuffle.service.enabled` ## What changes were proposed in this pull request? The configuration parameter "spark.shuffle.service.enabled" has defined in `package.scala`, and it is also used in many place, so we can replace it with `SHUFFLE_SERVICE_ENABLED`. and unified this configuration parameter "spark.shuffle.service.port" together. ## How was this patch tested? N/A Closes #22306 from 10110346/unifiedserviceenable. Authored-by: liuxian Signed-off-by: Wenchen Fan --- .../org/apache/spark/ExecutorAllocationManager.scala | 4 ++-- .../org/apache/spark/deploy/ExternalShuffleService.scala | 8 ++++---- .../scala/org/apache/spark/deploy/LocalSparkCluster.scala | 4 ++-- .../scala/org/apache/spark/deploy/worker/Worker.scala | 4 ++-- .../scala/org/apache/spark/internal/config/package.scala | 3 +++ .../scala/org/apache/spark/storage/BlockManager.scala | 7 ++++--- core/src/main/scala/org/apache/spark/util/Utils.scala | 4 ++-- .../org/apache/spark/ExecutorAllocationManagerSuite.scala | 3 ++- .../org/apache/spark/ExternalShuffleServiceSuite.scala | 5 +++-- .../spark/deploy/StandaloneDynamicAllocationSuite.scala | 2 +- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 5 +++-- .../org/apache/spark/storage/BlockManagerSuite.scala | 4 ++-- .../mesos/MesosCoarseGrainedSchedulerBackend.scala | 4 ++-- .../mesos/MesosCoarseGrainedSchedulerBackendSuite.scala | 2 +- .../spark/deploy/yarn/YarnShuffleIntegrationSuite.scala | 6 +++--- 15 files changed, 36 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 17b88631bcb4c..c3e5b96a55884 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -25,7 +25,7 @@ import scala.util.control.{ControlThrowable, NonFatal} import com.codahale.metrics.{Gauge, MetricRegistry} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config._ import org.apache.spark.metrics.source.Source import org.apache.spark.scheduler._ @@ -212,7 +212,7 @@ private[spark] class ExecutorAllocationManager( } // Require external shuffle service for dynamic allocation // Otherwise, we may lose shuffle files when killing executors - if (!conf.getBoolean("spark.shuffle.service.enabled", false) && !testing) { + if (!conf.get(config.SHUFFLE_SERVICE_ENABLED) && !testing) { throw new SparkException("Dynamic allocation of executors requires the external " + "shuffle service. You may enable this through spark.shuffle.service.enabled.") } diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala index b59a4fe66587c..f6b3c37f0fe72 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala @@ -22,7 +22,7 @@ import java.util.concurrent.CountDownLatch import scala.collection.JavaConverters._ import org.apache.spark.{SecurityManager, SparkConf} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.metrics.MetricsSystem import org.apache.spark.network.TransportContext import org.apache.spark.network.crypto.AuthServerBootstrap @@ -45,8 +45,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana protected val masterMetricsSystem = MetricsSystem.createMetricsSystem("shuffleService", sparkConf, securityManager) - private val enabled = sparkConf.getBoolean("spark.shuffle.service.enabled", false) - private val port = sparkConf.getInt("spark.shuffle.service.port", 7337) + private val enabled = sparkConf.get(config.SHUFFLE_SERVICE_ENABLED) + private val port = sparkConf.get(config.SHUFFLE_SERVICE_PORT) private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, "shuffle", numUsableCores = 0) @@ -131,7 +131,7 @@ object ExternalShuffleService extends Logging { // we override this value since this service is started from the command line // and we assume the user really wants it to be running - sparkConf.set("spark.shuffle.service.enabled", "true") + sparkConf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true") server = newShuffleService(sparkConf, securityManager) server.start() diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index 84aa8944fc1c7..be293f88a9d4a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.SparkConf import org.apache.spark.deploy.master.Master import org.apache.spark.deploy.worker.Worker -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.rpc.RpcEnv import org.apache.spark.util.Utils @@ -52,7 +52,7 @@ class LocalSparkCluster( // Disable REST server on Master in this mode unless otherwise specified val _conf = conf.clone() .setIfMissing("spark.master.rest.enabled", "false") - .set("spark.shuffle.service.enabled", "false") + .set(config.SHUFFLE_SERVICE_ENABLED.key, "false") /* Start the Master */ val (rpcEnv, webUiPort, _) = Master.startRpcEnvAndEndpoint(localHostname, 0, 0, _conf) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index cbd812a05a2c6..d5ea2523c628b 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -36,7 +36,7 @@ import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.ExternalShuffleService import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.deploy.worker.ui.WorkerWebUI -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.metrics.MetricsSystem import org.apache.spark.rpc._ import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils} @@ -773,7 +773,7 @@ private[deploy] object Worker extends Logging { // bound, we may launch no more than one external shuffle service on each host. // When this happens, we should give explicit reason of failure instead of fail silently. For // more detail see SPARK-20989. - val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) + val externalShuffleServiceEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED) val sparkWorkerInstances = scala.sys.env.getOrElse("SPARK_WORKER_INSTANCES", "1").toInt require(externalShuffleServiceEnabled == false || sparkWorkerInstances <= 1, "Starting multiple workers on one host is failed because we may launch no more than one " + diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 7c2f601c9986a..319e664a19677 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -144,6 +144,9 @@ package object config { private[spark] val SHUFFLE_SERVICE_ENABLED = ConfigBuilder("spark.shuffle.service.enabled").booleanConf.createWithDefault(false) + private[spark] val SHUFFLE_SERVICE_PORT = + ConfigBuilder("spark.shuffle.service.port").intConf.createWithDefault(7337) + private[spark] val KEYTAB = ConfigBuilder("spark.yarn.keytab") .doc("Location of user's keytab.") .stringConf.createOptional diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index e7cdfab99b34d..f5c69ad241e3a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -130,7 +130,7 @@ private[spark] class BlockManager( extends BlockDataManager with BlockEvictionHandler with Logging { private[spark] val externalShuffleServiceEnabled = - conf.getBoolean("spark.shuffle.service.enabled", false) + conf.get(config.SHUFFLE_SERVICE_ENABLED) private val chunkSize = conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests", Int.MaxValue.toString).toInt private val remoteReadNioBufferConversion = @@ -165,12 +165,13 @@ private[spark] class BlockManager( // Port used by the external shuffle service. In Yarn mode, this may be already be // set through the Hadoop configuration as the server is launched in the Yarn NM. private val externalShuffleServicePort = { - val tmpPort = Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt + val tmpPort = Utils.getSparkOrYarnConfig(conf, config.SHUFFLE_SERVICE_PORT.key, + config.SHUFFLE_SERVICE_PORT.defaultValueString).toInt if (tmpPort == 0) { // for testing, we set "spark.shuffle.service.port" to 0 in the yarn config, so yarn finds // an open port. But we still need to tell our spark apps the right port to use. So // only if the yarn config has the port set to 0, we prefer the value in the spark config - conf.get("spark.shuffle.service.port").toInt + conf.get(config.SHUFFLE_SERVICE_PORT.key).toInt } else { tmpPort } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 935bff92c466f..15c958d3f511e 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -60,7 +60,7 @@ import org.slf4j.Logger import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config._ import org.apache.spark.launcher.SparkLauncher import org.apache.spark.network.util.JavaUtils @@ -822,7 +822,7 @@ private[spark] object Utils extends Logging { * logic of locating the local directories according to deployment mode. */ def getConfiguredLocalDirs(conf: SparkConf): Array[String] = { - val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) + val shuffleServiceEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED) if (isRunningInYarnContainer(conf)) { // If we are in yarn mode, systems can have different disk layouts so we must set it // to what Yarn on this system said was available. Note this assumes that Yarn has diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 659ebb60fef86..5c718cb654ce8 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -24,6 +24,7 @@ import org.mockito.Mockito.{mock, never, verify, when} import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.apache.spark.executor.TaskMetrics +import org.apache.spark.internal.config import org.apache.spark.scheduler._ import org.apache.spark.scheduler.ExternalClusterManager import org.apache.spark.scheduler.cluster.ExecutorInfo @@ -1092,7 +1093,7 @@ class ExecutorAllocationManagerSuite val maxExecutors = 2 val conf = new SparkConf() .set("spark.dynamicAllocation.enabled", "true") - .set("spark.shuffle.service.enabled", "true") + .set(config.SHUFFLE_SERVICE_ENABLED.key, "true") .set("spark.dynamicAllocation.minExecutors", minExecutors.toString) .set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString) .set("spark.dynamicAllocation.initialExecutors", initialExecutors.toString) diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala index 472952addf353..462d5f5604ae3 100644 --- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark import org.scalatest.BeforeAndAfterAll +import org.apache.spark.internal.config import org.apache.spark.network.TransportContext import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.server.TransportServer @@ -42,8 +43,8 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll { server = transportContext.createServer() conf.set("spark.shuffle.manager", "sort") - conf.set("spark.shuffle.service.enabled", "true") - conf.set("spark.shuffle.service.port", server.getPort.toString) + conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true") + conf.set(config.SHUFFLE_SERVICE_PORT.key, server.getPort.toString) } override def afterAll() { diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 27cc47496c805..a1d2a1283db14 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -458,7 +458,7 @@ class StandaloneDynamicAllocationSuite val initialExecutorLimit = 1 val myConf = appConf .set("spark.dynamicAllocation.enabled", "true") - .set("spark.shuffle.service.enabled", "true") + .set(config.SHUFFLE_SERVICE_ENABLED.key, "true") .set("spark.dynamicAllocation.initialExecutors", initialExecutorLimit.toString) sc = new SparkContext(myConf) val appId = sc.applicationId diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index cd00051c56e8d..e0202fe703f82 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -30,6 +30,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark._ import org.apache.spark.broadcast.BroadcastManager +import org.apache.spark.internal.config import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException} @@ -406,7 +407,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // reset the test context with the right shuffle service config afterEach() val conf = new SparkConf() - conf.set("spark.shuffle.service.enabled", "true") + conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true") conf.set("spark.files.fetchFailure.unRegisterOutputOnHost", "true") init(conf) runEvent(ExecutorAdded("exec-hostA1", "hostA")) @@ -728,7 +729,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // reset the test context with the right shuffle service config afterEach() val conf = new SparkConf() - conf.set("spark.shuffle.service.enabled", shuffleServiceOn.toString) + conf.set(config.SHUFFLE_SERVICE_ENABLED.key, shuffleServiceOn.toString) init(conf) assert(sc.env.blockManager.externalShuffleServiceEnabled == shuffleServiceOn) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 08172f0b07b75..dbee1f60d7af0 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1377,8 +1377,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val (server, shufflePort) = Utils.startServiceOnPort(candidatePort, newShuffleServer, conf, "ShuffleServer") - conf.set("spark.shuffle.service.enabled", "true") - conf.set("spark.shuffle.service.port", shufflePort.toString) + conf.set(SHUFFLE_SERVICE_ENABLED.key, "true") + conf.set(SHUFFLE_SERVICE_PORT.key, shufflePort.toString) conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "40") conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "1") var e = intercept[SparkException] { diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 1ce2f816dffb2..178de30f0f381 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -102,7 +102,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( // If shuffle service is enabled, the Spark driver will register with the shuffle service. // This is for cleaning up shuffle files reliably. - private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) + private val shuffleServiceEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED) // Cores we have acquired with each Mesos task ID private val coresByTaskId = new mutable.HashMap[String, Int] @@ -624,7 +624,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( "External shuffle client was not instantiated even though shuffle service is enabled.") // TODO: Remove this and allow the MesosExternalShuffleService to detect // framework termination when new Mesos Framework HTTP API is available. - val externalShufflePort = conf.getInt("spark.shuffle.service.port", 7337) + val externalShufflePort = conf.get(config.SHUFFLE_SERVICE_PORT) logDebug(s"Connecting to shuffle service on slave $slaveId, " + s"host ${slave.hostname}, port $externalShufflePort for app ${conf.getAppId}") diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index b790c7cd27794..da33d85d8fb2e 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -262,7 +262,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite } test("mesos doesn't register twice with the same shuffle service") { - setBackend(Map("spark.shuffle.service.enabled" -> "true")) + setBackend(Map(SHUFFLE_SERVICE_ENABLED.key -> "true")) val (mem, cpu) = (backend.executorMemory(sc), 4) val offer1 = createOffer("o1", "s1", mem, cpu) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala index 01db796096f26..37bccaf0439b4 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala @@ -44,7 +44,7 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite { yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle") yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"), classOf[YarnShuffleService].getCanonicalName) - yarnConfig.set("spark.shuffle.service.port", "0") + yarnConfig.set(SHUFFLE_SERVICE_PORT.key, "0") yarnConfig } @@ -54,8 +54,8 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite { logInfo("Shuffle service port = " + shuffleServicePort) Map( - "spark.shuffle.service.enabled" -> "true", - "spark.shuffle.service.port" -> shuffleServicePort.toString, + SHUFFLE_SERVICE_ENABLED.key -> "true", + SHUFFLE_SERVICE_PORT.key -> shuffleServicePort.toString, MAX_EXECUTOR_FAILURES.key -> "1" ) }