Skip to content

Commit

Permalink
Config for hard cpu limit on pods; default unlimited (apache#356)
Browse files Browse the repository at this point in the history
  • Loading branch information
sandflee authored and ash211 committed Jun 23, 2017
1 parent 08fe944 commit 8b3248f
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 2 deletions.
14 changes: 14 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,20 @@ from the other deployment modes. See the [configuration page](configuration.html
Docker image pull policy used when pulling Docker images with Kubernetes.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.limit.cores</code></td>
<td>(none)</td>
<td>
Specify the hard cpu limit for the driver pod
</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.limit.cores</code></td>
<td>(none)</td>
<td>
Specify the hard cpu limit for a single executor pod
</td>
</tr>
</table>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,18 @@ package object config extends Logging {
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_LIMIT_CORES =
ConfigBuilder("spark.kubernetes.driver.limit.cores")
.doc("Specify the hard cpu limit for the driver pod")
.stringConf
.createOptional

private[spark] val KUBERNETES_EXECUTOR_LIMIT_CORES =
ConfigBuilder("spark.kubernetes.executor.limit.cores")
.doc("Specify the hard cpu limit for a single executor pod")
.stringConf
.createOptional

private[spark] def resolveK8sMaster(rawMasterString: String): String = {
if (!rawMasterString.startsWith("k8s://")) {
throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ private[spark] class Client(

// CPU settings
private val driverCpuCores = sparkConf.getOption("spark.driver.cores").getOrElse("1")
private val driverLimitCores = sparkConf.getOption(KUBERNETES_DRIVER_LIMIT_CORES.key)

// Memory settings
private val driverMemoryMb = sparkConf.get(org.apache.spark.internal.config.DRIVER_MEMORY)
Expand Down Expand Up @@ -139,7 +140,6 @@ private[spark] class Client(
.endEnv()
.withNewResources()
.addToRequests("cpu", driverCpuQuantity)
.addToLimits("cpu", driverCpuQuantity)
.addToRequests("memory", driverMemoryQuantity)
.addToLimits("memory", driverMemoryLimitQuantity)
.endResources()
Expand All @@ -156,6 +156,21 @@ private[spark] class Client(
.addToContainers(driverContainer)
.endSpec()

driverLimitCores.map {
limitCores =>
val driverCpuLimitQuantity = new QuantityBuilder(false)
.withAmount(limitCores)
.build()
basePod
.editSpec()
.editFirstContainer()
.editResources
.addToLimits("cpu", driverCpuLimitQuantity)
.endResources()
.endContainer()
.endSpec()
}

val maybeSubmittedResourceIdentifiers = initContainerComponentsProvider
.provideInitContainerSubmittedDependencyUploader(allDriverLabels)
.map { uploader =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
private val executorMemoryWithOverhead = executorMemoryMb + memoryOverheadMb

private val executorCores = conf.getOption("spark.executor.cores").getOrElse("1")
private val executorLimitCores = conf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key)

private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("kubernetes-executor-requests"))
Expand Down Expand Up @@ -438,14 +439,28 @@ private[spark] class KubernetesClusterSchedulerBackend(
.addToRequests("memory", executorMemoryQuantity)
.addToLimits("memory", executorMemoryLimitQuantity)
.addToRequests("cpu", executorCpuQuantity)
.addToLimits("cpu", executorCpuQuantity)
.endResources()
.addAllToEnv(requiredEnv.asJava)
.addToEnv(executorExtraClasspathEnv.toSeq: _*)
.withPorts(requiredPorts.asJava)
.endContainer()
.endSpec()

executorLimitCores.map {
limitCores =>
val executorCpuLimitQuantity = new QuantityBuilder(false)
.withAmount(limitCores)
.build()
basePodBuilder
.editSpec()
.editFirstContainer()
.editResources
.addToLimits("cpu", executorCpuLimitQuantity)
.endResources()
.endContainer()
.endSpec()
}

val withMaybeShuffleConfigPodBuilder = shuffleServiceConfig
.map { config =>
config.shuffleDirs.foldLeft(basePodBuilder) { (builder, dir) =>
Expand Down

0 comments on commit 8b3248f

Please sign in to comment.