Skip to content

Commit

Permalink
[SPARK-25300][CORE] Unified the configuration parameter `spark.shuffl…
Browse files Browse the repository at this point in the history
…e.service.enabled`

## What changes were proposed in this pull request?

The configuration parameter "spark.shuffle.service.enabled"  has defined in `package.scala`,  and it  is also used in many place,  so we can replace it with `SHUFFLE_SERVICE_ENABLED`.
and unified  this configuration parameter "spark.shuffle.service.port"  together.

## How was this patch tested?
N/A

Closes #22306 from 10110346/unifiedserviceenable.

Authored-by: liuxian <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
10110346 authored and cloud-fan committed Sep 5, 2018
1 parent 103f513 commit ca861fe
Show file tree
Hide file tree
Showing 15 changed files with 36 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.util.control.{ControlThrowable, NonFatal}

import com.codahale.metrics.{Gauge, MetricRegistry}

import org.apache.spark.internal.Logging
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config._
import org.apache.spark.metrics.source.Source
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -212,7 +212,7 @@ private[spark] class ExecutorAllocationManager(
}
// Require external shuffle service for dynamic allocation
// Otherwise, we may lose shuffle files when killing executors
if (!conf.getBoolean("spark.shuffle.service.enabled", false) && !testing) {
if (!conf.get(config.SHUFFLE_SERVICE_ENABLED) && !testing) {
throw new SparkException("Dynamic allocation of executors requires the external " +
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.CountDownLatch
import scala.collection.JavaConverters._

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.TransportContext
import org.apache.spark.network.crypto.AuthServerBootstrap
Expand All @@ -45,8 +45,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
protected val masterMetricsSystem =
MetricsSystem.createMetricsSystem("shuffleService", sparkConf, securityManager)

private val enabled = sparkConf.getBoolean("spark.shuffle.service.enabled", false)
private val port = sparkConf.getInt("spark.shuffle.service.port", 7337)
private val enabled = sparkConf.get(config.SHUFFLE_SERVICE_ENABLED)
private val port = sparkConf.get(config.SHUFFLE_SERVICE_PORT)

private val transportConf =
SparkTransportConf.fromSparkConf(sparkConf, "shuffle", numUsableCores = 0)
Expand Down Expand Up @@ -131,7 +131,7 @@ object ExternalShuffleService extends Logging {

// we override this value since this service is started from the command line
// and we assume the user really wants it to be running
sparkConf.set("spark.shuffle.service.enabled", "true")
sparkConf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
server = newShuffleService(sparkConf, securityManager)
server.start()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkConf
import org.apache.spark.deploy.master.Master
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -52,7 +52,7 @@ class LocalSparkCluster(
// Disable REST server on Master in this mode unless otherwise specified
val _conf = conf.clone()
.setIfMissing("spark.master.rest.enabled", "false")
.set("spark.shuffle.service.enabled", "false")
.set(config.SHUFFLE_SERVICE_ENABLED.key, "false")

/* Start the Master */
val (rpcEnv, webUiPort, _) = Master.startRpcEnvAndEndpoint(localHostname, 0, 0, _conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.ExternalShuffleService
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}
Expand Down Expand Up @@ -773,7 +773,7 @@ private[deploy] object Worker extends Logging {
// bound, we may launch no more than one external shuffle service on each host.
// When this happens, we should give explicit reason of failure instead of fail silently. For
// more detail see SPARK-20989.
val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
val externalShuffleServiceEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED)
val sparkWorkerInstances = scala.sys.env.getOrElse("SPARK_WORKER_INSTANCES", "1").toInt
require(externalShuffleServiceEnabled == false || sparkWorkerInstances <= 1,
"Starting multiple workers on one host is failed because we may launch no more than one " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ package object config {
private[spark] val SHUFFLE_SERVICE_ENABLED =
ConfigBuilder("spark.shuffle.service.enabled").booleanConf.createWithDefault(false)

private[spark] val SHUFFLE_SERVICE_PORT =
ConfigBuilder("spark.shuffle.service.port").intConf.createWithDefault(7337)

private[spark] val KEYTAB = ConfigBuilder("spark.yarn.keytab")
.doc("Location of user's keytab.")
.stringConf.createOptional
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private[spark] class BlockManager(
extends BlockDataManager with BlockEvictionHandler with Logging {

private[spark] val externalShuffleServiceEnabled =
conf.getBoolean("spark.shuffle.service.enabled", false)
conf.get(config.SHUFFLE_SERVICE_ENABLED)
private val chunkSize =
conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests", Int.MaxValue.toString).toInt
private val remoteReadNioBufferConversion =
Expand Down Expand Up @@ -165,12 +165,13 @@ private[spark] class BlockManager(
// Port used by the external shuffle service. In Yarn mode, this may be already be
// set through the Hadoop configuration as the server is launched in the Yarn NM.
private val externalShuffleServicePort = {
val tmpPort = Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt
val tmpPort = Utils.getSparkOrYarnConfig(conf, config.SHUFFLE_SERVICE_PORT.key,
config.SHUFFLE_SERVICE_PORT.defaultValueString).toInt
if (tmpPort == 0) {
// for testing, we set "spark.shuffle.service.port" to 0 in the yarn config, so yarn finds
// an open port. But we still need to tell our spark apps the right port to use. So
// only if the yarn config has the port set to 0, we prefer the value in the spark config
conf.get("spark.shuffle.service.port").toInt
conf.get(config.SHUFFLE_SERVICE_PORT.key).toInt
} else {
tmpPort
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ import org.slf4j.Logger

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config._
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.network.util.JavaUtils
Expand Down Expand Up @@ -822,7 +822,7 @@ private[spark] object Utils extends Logging {
* logic of locating the local directories according to deployment mode.
*/
def getConfiguredLocalDirs(conf: SparkConf): Array[String] = {
val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
val shuffleServiceEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED)
if (isRunningInYarnContainer(conf)) {
// If we are in yarn mode, systems can have different disk layouts so we must set it
// to what Yarn on this system said was available. Note this assumes that Yarn has
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.mockito.Mockito.{mock, never, verify, when}
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.config
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.ExternalClusterManager
import org.apache.spark.scheduler.cluster.ExecutorInfo
Expand Down Expand Up @@ -1092,7 +1093,7 @@ class ExecutorAllocationManagerSuite
val maxExecutors = 2
val conf = new SparkConf()
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.shuffle.service.enabled", "true")
.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
.set("spark.dynamicAllocation.minExecutors", minExecutors.toString)
.set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString)
.set("spark.dynamicAllocation.initialExecutors", initialExecutors.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark

import org.scalatest.BeforeAndAfterAll

import org.apache.spark.internal.config
import org.apache.spark.network.TransportContext
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.server.TransportServer
Expand All @@ -42,8 +43,8 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
server = transportContext.createServer()

conf.set("spark.shuffle.manager", "sort")
conf.set("spark.shuffle.service.enabled", "true")
conf.set("spark.shuffle.service.port", server.getPort.toString)
conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
conf.set(config.SHUFFLE_SERVICE_PORT.key, server.getPort.toString)
}

override def afterAll() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ class StandaloneDynamicAllocationSuite
val initialExecutorLimit = 1
val myConf = appConf
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.shuffle.service.enabled", "true")
.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
.set("spark.dynamicAllocation.initialExecutors", initialExecutorLimit.toString)
sc = new SparkContext(myConf)
val appId = sc.applicationId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.scalatest.time.SpanSugar._

import org.apache.spark._
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.internal.config
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException}
Expand Down Expand Up @@ -406,7 +407,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
// reset the test context with the right shuffle service config
afterEach()
val conf = new SparkConf()
conf.set("spark.shuffle.service.enabled", "true")
conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
conf.set("spark.files.fetchFailure.unRegisterOutputOnHost", "true")
init(conf)
runEvent(ExecutorAdded("exec-hostA1", "hostA"))
Expand Down Expand Up @@ -728,7 +729,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
// reset the test context with the right shuffle service config
afterEach()
val conf = new SparkConf()
conf.set("spark.shuffle.service.enabled", shuffleServiceOn.toString)
conf.set(config.SHUFFLE_SERVICE_ENABLED.key, shuffleServiceOn.toString)
init(conf)
assert(sc.env.blockManager.externalShuffleServiceEnabled == shuffleServiceOn)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1377,8 +1377,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val (server, shufflePort) = Utils.startServiceOnPort(candidatePort,
newShuffleServer, conf, "ShuffleServer")

conf.set("spark.shuffle.service.enabled", "true")
conf.set("spark.shuffle.service.port", shufflePort.toString)
conf.set(SHUFFLE_SERVICE_ENABLED.key, "true")
conf.set(SHUFFLE_SERVICE_PORT.key, shufflePort.toString)
conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "40")
conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "1")
var e = intercept[SparkException] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(

// If shuffle service is enabled, the Spark driver will register with the shuffle service.
// This is for cleaning up shuffle files reliably.
private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
private val shuffleServiceEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED)

// Cores we have acquired with each Mesos task ID
private val coresByTaskId = new mutable.HashMap[String, Int]
Expand Down Expand Up @@ -624,7 +624,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
"External shuffle client was not instantiated even though shuffle service is enabled.")
// TODO: Remove this and allow the MesosExternalShuffleService to detect
// framework termination when new Mesos Framework HTTP API is available.
val externalShufflePort = conf.getInt("spark.shuffle.service.port", 7337)
val externalShufflePort = conf.get(config.SHUFFLE_SERVICE_PORT)

logDebug(s"Connecting to shuffle service on slave $slaveId, " +
s"host ${slave.hostname}, port $externalShufflePort for app ${conf.getAppId}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
}

test("mesos doesn't register twice with the same shuffle service") {
setBackend(Map("spark.shuffle.service.enabled" -> "true"))
setBackend(Map(SHUFFLE_SERVICE_ENABLED.key -> "true"))
val (mem, cpu) = (backend.executorMemory(sc), 4)

val offer1 = createOffer("o1", "s1", mem, cpu)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite {
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle")
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"),
classOf[YarnShuffleService].getCanonicalName)
yarnConfig.set("spark.shuffle.service.port", "0")
yarnConfig.set(SHUFFLE_SERVICE_PORT.key, "0")
yarnConfig
}

Expand All @@ -54,8 +54,8 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite {
logInfo("Shuffle service port = " + shuffleServicePort)

Map(
"spark.shuffle.service.enabled" -> "true",
"spark.shuffle.service.port" -> shuffleServicePort.toString,
SHUFFLE_SERVICE_ENABLED.key -> "true",
SHUFFLE_SERVICE_PORT.key -> shuffleServicePort.toString,
MAX_EXECUTOR_FAILURES.key -> "1"
)
}
Expand Down

0 comments on commit ca861fe

Please sign in to comment.