diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 60d04046c7b2a..6fade10b7a3c8 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -222,9 +222,6 @@ private[spark] class ExecutorAllocationManager( throw new SparkException("Dynamic allocation of executors requires the external " + "shuffle service. You may enable this through spark.shuffle.service.enabled.") } - if (tasksPerExecutorForFullParallelism == 0) { - throw new SparkException(s"${EXECUTOR_CORES.key} must not be < ${CPUS_PER_TASK.key}.") - } if (executorAllocationRatio > 1.0 || executorAllocationRatio <= 0.0) { throw new SparkException( diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 4117aea59b7e4..913a1704ad5ce 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -575,16 +575,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria } } - if (contains(EXECUTOR_CORES) && contains(CPUS_PER_TASK)) { - val executorCores = get(EXECUTOR_CORES) - val taskCpus = get(CPUS_PER_TASK) - - if (executorCores < taskCpus) { - throw new SparkException( - s"${EXECUTOR_CORES.key} must not be less than ${CPUS_PER_TASK.key}.") - } - } - val encryptionEnabled = get(NETWORK_CRYPTO_ENABLED) || get(SASL_ENCRYPTION_ENABLED) require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED), s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.") diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4abb18d4aaa73..8b744356daaee 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2665,8 +2665,27 @@ object SparkContext extends Logging { // When running locally, don't try to re-execute tasks on failure. val MAX_LOCAL_TASK_FAILURES = 1 + // SPARK-26340: Ensure that executor's core num meets at least one task requirement. + def checkCpusPerTask( + clusterMode: Boolean, + maxCoresPerExecutor: Option[Int]): Unit = { + val cpusPerTask = sc.conf.get(CPUS_PER_TASK) + if (clusterMode && sc.conf.contains(EXECUTOR_CORES)) { + if (sc.conf.get(EXECUTOR_CORES) < cpusPerTask) { + throw new SparkException(s"${CPUS_PER_TASK.key}" + + s" must be <= ${EXECUTOR_CORES.key} when run on $master.") + } + } else if (maxCoresPerExecutor.isDefined) { + if (maxCoresPerExecutor.get < cpusPerTask) { + throw new SparkException(s"Only ${maxCoresPerExecutor.get} cores available per executor" + + s" when run on $master, and ${CPUS_PER_TASK.key} must be <= it.") + } + } + } + master match { case "local" => + checkCpusPerTask(clusterMode = false, Some(1)) val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1) scheduler.initialize(backend) @@ -2679,6 +2698,7 @@ object SparkContext extends Logging { if (threadCount <= 0) { throw new SparkException(s"Asked to run locally with $threadCount threads") } + checkCpusPerTask(clusterMode = false, Some(threadCount)) val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount) scheduler.initialize(backend) @@ -2689,12 +2709,14 @@ object SparkContext extends Logging { // local[*, M] means the number of cores on the computer with M failures // local[N, M] means exactly N threads with M failures val threadCount = if (threads == "*") localCpuCount else threads.toInt + checkCpusPerTask(clusterMode = false, Some(threadCount)) val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true) val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount) scheduler.initialize(backend) (backend, scheduler) case SPARK_REGEX(sparkUrl) => + checkCpusPerTask(clusterMode = true, None) val scheduler = new TaskSchedulerImpl(sc) val masterUrls = sparkUrl.split(",").map("spark://" + _) val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls) @@ -2702,6 +2724,7 @@ object SparkContext extends Logging { (backend, scheduler) case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) => + checkCpusPerTask(clusterMode = true, Some(coresPerSlave.toInt)) // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang. val memoryPerSlaveInt = memoryPerSlave.toInt if (sc.executorMemory > memoryPerSlaveInt) { @@ -2722,6 +2745,7 @@ object SparkContext extends Logging { (backend, scheduler) case masterUrl => + checkCpusPerTask(clusterMode = true, None) val cm = getClusterManager(masterUrl) match { case Some(clusterMgr) => clusterMgr case None => throw new SparkException("Could not parse Master URL: '" + master + "'") diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 9f759a5bf74ba..0795790b2ddf3 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -140,13 +140,6 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst assert(sc.appName === "My other app") } - test("creating SparkContext with cpus per tasks bigger than cores per executors") { - val conf = new SparkConf(false) - .set(EXECUTOR_CORES, 1) - .set(CPUS_PER_TASK, 2) - intercept[SparkException] { sc = new SparkContext(conf) } - } - test("nested property names") { // This wasn't supported by some external conf parsing libraries System.setProperty("spark.test.a", "a") diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 7a16f7b715e63..3490eaf550ce6 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -710,6 +710,27 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu assert(runningTaskIds.isEmpty) } } + + test(s"Avoid setting ${CPUS_PER_TASK.key} unreasonably (SPARK-27192)") { + val FAIL_REASON = s"${CPUS_PER_TASK.key} must be <=" + Seq( + ("local", 2, None), + ("local[2]", 3, None), + ("local[2, 1]", 3, None), + ("spark://test-spark-cluster", 2, Option(1)), + ("local-cluster[1, 1, 1000]", 2, Option(1)), + ("yarn", 2, Option(1)) + ).foreach { case (master, cpusPerTask, executorCores) => + val conf = new SparkConf() + conf.set(CPUS_PER_TASK, cpusPerTask) + executorCores.map(executorCores => conf.set(EXECUTOR_CORES, executorCores)) + val ex = intercept[SparkException] { + sc = new SparkContext(master, "test", conf) + } + assert(ex.getMessage.contains(FAIL_REASON)) + resetSparkContext() + } + } } object SparkContextSuite { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 1a81f556e0612..115b203df1e40 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -77,7 +77,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B } def setupScheduler(confs: (String, String)*): TaskSchedulerImpl = { - val conf = new SparkConf().setMaster("local").setAppName("TaskSchedulerImplSuite") + setupSchedulerWithMaster("local", confs: _*) + } + + def setupSchedulerWithMaster(master: String, confs: (String, String)*): TaskSchedulerImpl = { + val conf = new SparkConf().setMaster(master).setAppName("TaskSchedulerImplSuite") confs.foreach { case (k, v) => conf.set(k, v) } sc = new SparkContext(conf) taskScheduler = new TaskSchedulerImpl(sc) @@ -155,7 +159,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B test("Scheduler correctly accounts for multiple CPUs per task") { val taskCpus = 2 - val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> taskCpus.toString) + val taskScheduler = setupSchedulerWithMaster( + s"local[$taskCpus]", + config.CPUS_PER_TASK.key -> taskCpus.toString) // Give zero core offers. Should not generate any tasks val zeroCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 0), new WorkerOffer("executor1", "host1", 0)) @@ -185,7 +191,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B test("Scheduler does not crash when tasks are not serializable") { val taskCpus = 2 - val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> taskCpus.toString) + val taskScheduler = setupSchedulerWithMaster( + s"local[$taskCpus]", + config.CPUS_PER_TASK.key -> taskCpus.toString) val numFreeCores = 1 val taskSet = new TaskSet( Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null) @@ -1241,7 +1249,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B test("don't schedule for a barrier taskSet if available slots are less than pending tasks") { val taskCpus = 2 - val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> taskCpus.toString) + val taskScheduler = setupSchedulerWithMaster( + s"local[$taskCpus]", + config.CPUS_PER_TASK.key -> taskCpus.toString) val numFreeCores = 3 val workerOffers = IndexedSeq( @@ -1258,7 +1268,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B test("schedule tasks for a barrier taskSet if all tasks can be launched together") { val taskCpus = 2 - val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> taskCpus.toString) + val taskScheduler = setupSchedulerWithMaster( + s"local[$taskCpus]", + config.CPUS_PER_TASK.key -> taskCpus.toString) val numFreeCores = 3 val workerOffers = IndexedSeq(