Skip to content

Commit

Permalink
[SPARK-27192][CORE] spark.task.cpus should be less or equal than spar…
Browse files Browse the repository at this point in the history
…k.executor.cores

## What changes were proposed in this pull request?
check spark.task.cpus before creating TaskScheduler in SparkContext

## How was this patch tested?
UT

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #24261 from liutang123/SPARK-27192.

Authored-by: liulijia <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
  • Loading branch information
liutang123 authored and srowen committed Apr 5, 2019
1 parent 982c4c8 commit 39f75b4
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,6 @@ private[spark] class ExecutorAllocationManager(
throw new SparkException("Dynamic allocation of executors requires the external " +
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
}
if (tasksPerExecutorForFullParallelism == 0) {
throw new SparkException(s"${EXECUTOR_CORES.key} must not be < ${CPUS_PER_TASK.key}.")
}

if (executorAllocationRatio > 1.0 || executorAllocationRatio <= 0.0) {
throw new SparkException(
Expand Down
10 changes: 0 additions & 10 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -575,16 +575,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
}
}

if (contains(EXECUTOR_CORES) && contains(CPUS_PER_TASK)) {
val executorCores = get(EXECUTOR_CORES)
val taskCpus = get(CPUS_PER_TASK)

if (executorCores < taskCpus) {
throw new SparkException(
s"${EXECUTOR_CORES.key} must not be less than ${CPUS_PER_TASK.key}.")
}
}

val encryptionEnabled = get(NETWORK_CRYPTO_ENABLED) || get(SASL_ENCRYPTION_ENABLED)
require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED),
s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.")
Expand Down
24 changes: 24 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2665,8 +2665,27 @@ object SparkContext extends Logging {
// When running locally, don't try to re-execute tasks on failure.
val MAX_LOCAL_TASK_FAILURES = 1

// SPARK-26340: Ensure that executor's core num meets at least one task requirement.
def checkCpusPerTask(
clusterMode: Boolean,
maxCoresPerExecutor: Option[Int]): Unit = {
val cpusPerTask = sc.conf.get(CPUS_PER_TASK)
if (clusterMode && sc.conf.contains(EXECUTOR_CORES)) {
if (sc.conf.get(EXECUTOR_CORES) < cpusPerTask) {
throw new SparkException(s"${CPUS_PER_TASK.key}" +
s" must be <= ${EXECUTOR_CORES.key} when run on $master.")
}
} else if (maxCoresPerExecutor.isDefined) {
if (maxCoresPerExecutor.get < cpusPerTask) {
throw new SparkException(s"Only ${maxCoresPerExecutor.get} cores available per executor" +
s" when run on $master, and ${CPUS_PER_TASK.key} must be <= it.")
}
}
}

master match {
case "local" =>
checkCpusPerTask(clusterMode = false, Some(1))
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
scheduler.initialize(backend)
Expand All @@ -2679,6 +2698,7 @@ object SparkContext extends Logging {
if (threadCount <= 0) {
throw new SparkException(s"Asked to run locally with $threadCount threads")
}
checkCpusPerTask(clusterMode = false, Some(threadCount))
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
Expand All @@ -2689,19 +2709,22 @@ object SparkContext extends Logging {
// local[*, M] means the number of cores on the computer with M failures
// local[N, M] means exactly N threads with M failures
val threadCount = if (threads == "*") localCpuCount else threads.toInt
checkCpusPerTask(clusterMode = false, Some(threadCount))
val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)

case SPARK_REGEX(sparkUrl) =>
checkCpusPerTask(clusterMode = true, None)
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)

case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
checkCpusPerTask(clusterMode = true, Some(coresPerSlave.toInt))
// Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
val memoryPerSlaveInt = memoryPerSlave.toInt
if (sc.executorMemory > memoryPerSlaveInt) {
Expand All @@ -2722,6 +2745,7 @@ object SparkContext extends Logging {
(backend, scheduler)

case masterUrl =>
checkCpusPerTask(clusterMode = true, None)
val cm = getClusterManager(masterUrl) match {
case Some(clusterMgr) => clusterMgr
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
Expand Down
7 changes: 0 additions & 7 deletions core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,6 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
assert(sc.appName === "My other app")
}

test("creating SparkContext with cpus per tasks bigger than cores per executors") {
val conf = new SparkConf(false)
.set(EXECUTOR_CORES, 1)
.set(CPUS_PER_TASK, 2)
intercept[SparkException] { sc = new SparkContext(conf) }
}

test("nested property names") {
// This wasn't supported by some external conf parsing libraries
System.setProperty("spark.test.a", "a")
Expand Down
21 changes: 21 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,27 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
assert(runningTaskIds.isEmpty)
}
}

test(s"Avoid setting ${CPUS_PER_TASK.key} unreasonably (SPARK-27192)") {
val FAIL_REASON = s"${CPUS_PER_TASK.key} must be <="
Seq(
("local", 2, None),
("local[2]", 3, None),
("local[2, 1]", 3, None),
("spark://test-spark-cluster", 2, Option(1)),
("local-cluster[1, 1, 1000]", 2, Option(1)),
("yarn", 2, Option(1))
).foreach { case (master, cpusPerTask, executorCores) =>
val conf = new SparkConf()
conf.set(CPUS_PER_TASK, cpusPerTask)
executorCores.map(executorCores => conf.set(EXECUTOR_CORES, executorCores))
val ex = intercept[SparkException] {
sc = new SparkContext(master, "test", conf)
}
assert(ex.getMessage.contains(FAIL_REASON))
resetSparkContext()
}
}
}

object SparkContextSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
}

def setupScheduler(confs: (String, String)*): TaskSchedulerImpl = {
val conf = new SparkConf().setMaster("local").setAppName("TaskSchedulerImplSuite")
setupSchedulerWithMaster("local", confs: _*)
}

def setupSchedulerWithMaster(master: String, confs: (String, String)*): TaskSchedulerImpl = {
val conf = new SparkConf().setMaster(master).setAppName("TaskSchedulerImplSuite")
confs.foreach { case (k, v) => conf.set(k, v) }
sc = new SparkContext(conf)
taskScheduler = new TaskSchedulerImpl(sc)
Expand Down Expand Up @@ -155,7 +159,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B

test("Scheduler correctly accounts for multiple CPUs per task") {
val taskCpus = 2
val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> taskCpus.toString)
val taskScheduler = setupSchedulerWithMaster(
s"local[$taskCpus]",
config.CPUS_PER_TASK.key -> taskCpus.toString)
// Give zero core offers. Should not generate any tasks
val zeroCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 0),
new WorkerOffer("executor1", "host1", 0))
Expand Down Expand Up @@ -185,7 +191,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B

test("Scheduler does not crash when tasks are not serializable") {
val taskCpus = 2
val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> taskCpus.toString)
val taskScheduler = setupSchedulerWithMaster(
s"local[$taskCpus]",
config.CPUS_PER_TASK.key -> taskCpus.toString)
val numFreeCores = 1
val taskSet = new TaskSet(
Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null)
Expand Down Expand Up @@ -1241,7 +1249,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B

test("don't schedule for a barrier taskSet if available slots are less than pending tasks") {
val taskCpus = 2
val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> taskCpus.toString)
val taskScheduler = setupSchedulerWithMaster(
s"local[$taskCpus]",
config.CPUS_PER_TASK.key -> taskCpus.toString)

val numFreeCores = 3
val workerOffers = IndexedSeq(
Expand All @@ -1258,7 +1268,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B

test("schedule tasks for a barrier taskSet if all tasks can be launched together") {
val taskCpus = 2
val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> taskCpus.toString)
val taskScheduler = setupSchedulerWithMaster(
s"local[$taskCpus]",
config.CPUS_PER_TASK.key -> taskCpus.toString)

val numFreeCores = 3
val workerOffers = IndexedSeq(
Expand Down

0 comments on commit 39f75b4

Please sign in to comment.