diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/ConfigurationUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/ConfigurationUtils.scala index 23832e0850c10..6d443d1774107 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/ConfigurationUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/ConfigurationUtils.scala @@ -35,16 +35,18 @@ private[spark] object ConfigurationUtils { } def requireBothOrNeitherDefined( - opt1: Option[_], - opt2: Option[_], - errMessageWhenFirstIsMissing: String, - errMessageWhenSecondIsMissing: String): Unit = { + opt1: Option[_], + opt2: Option[_], + errMessageWhenFirstIsMissing: String, + errMessageWhenSecondIsMissing: String): Unit = { requireSecondIfFirstIsDefined(opt1, opt2, errMessageWhenSecondIsMissing) requireSecondIfFirstIsDefined(opt2, opt1, errMessageWhenFirstIsMissing) } def requireSecondIfFirstIsDefined( - opt1: Option[_], opt2: Option[_], errMessageWhenSecondIsMissing: String): Unit = { + opt1: Option[_], + opt2: Option[_], + errMessageWhenSecondIsMissing: String): Unit = { opt1.foreach { _ => require(opt2.isDefined, errMessageWhenSecondIsMissing) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala index 864c26959f929..444144b69c341 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala @@ -49,38 +49,38 @@ private[spark] object SparkKubernetesClientFactory { .orElse(maybeServiceAccountToken) val oauthTokenValue = sparkConf.getOption(oauthTokenConf) ConfigurationUtils.requireNandDefined( - oauthTokenFile, - oauthTokenValue, - s"Cannot specify OAuth token through both a file $oauthTokenFileConf and a" + - s" value $oauthTokenConf.") + oauthTokenFile, + oauthTokenValue, + s"Cannot specify OAuth token through both a file $oauthTokenFileConf and a" + + s" value $oauthTokenConf.") val caCertFile = sparkConf - .getOption(s"$kubernetesAuthConfPrefix.$CA_CERT_FILE_CONF_SUFFIX") - .orElse(maybeServiceAccountCaCert.map(_.getAbsolutePath)) + .getOption(s"$kubernetesAuthConfPrefix.$CA_CERT_FILE_CONF_SUFFIX") + .orElse(maybeServiceAccountCaCert.map(_.getAbsolutePath)) val clientKeyFile = sparkConf - .getOption(s"$kubernetesAuthConfPrefix.$CLIENT_KEY_FILE_CONF_SUFFIX") + .getOption(s"$kubernetesAuthConfPrefix.$CLIENT_KEY_FILE_CONF_SUFFIX") val clientCertFile = sparkConf - .getOption(s"$kubernetesAuthConfPrefix.$CLIENT_CERT_FILE_CONF_SUFFIX") + .getOption(s"$kubernetesAuthConfPrefix.$CLIENT_CERT_FILE_CONF_SUFFIX") val dispatcher = new Dispatcher( - ThreadUtils.newDaemonCachedThreadPool("kubernetes-dispatcher")) + ThreadUtils.newDaemonCachedThreadPool("kubernetes-dispatcher")) val config = new ConfigBuilder() - .withApiVersion("v1") - .withMasterUrl(master) - .withWebsocketPingInterval(0) - .withOption(oauthTokenValue) { - (token, configBuilder) => configBuilder.withOauthToken(token) - }.withOption(oauthTokenFile) { - (file, configBuilder) => - configBuilder.withOauthToken(Files.toString(file, Charsets.UTF_8)) - }.withOption(caCertFile) { - (file, configBuilder) => configBuilder.withCaCertFile(file) - }.withOption(clientKeyFile) { - (file, configBuilder) => configBuilder.withClientKeyFile(file) - }.withOption(clientCertFile) { - (file, configBuilder) => configBuilder.withClientCertFile(file) - }.withOption(namespace) { - (ns, configBuilder) => configBuilder.withNamespace(ns) - }.build() + .withApiVersion("v1") + .withMasterUrl(master) + .withWebsocketPingInterval(0) + .withOption(oauthTokenValue) { + (token, configBuilder) => configBuilder.withOauthToken(token) + }.withOption(oauthTokenFile) { + (file, configBuilder) => + configBuilder.withOauthToken(Files.toString(file, Charsets.UTF_8)) + }.withOption(caCertFile) { + (file, configBuilder) => configBuilder.withCaCertFile(file) + }.withOption(clientKeyFile) { + (file, configBuilder) => configBuilder.withClientKeyFile(file) + }.withOption(clientCertFile) { + (file, configBuilder) => configBuilder.withClientCertFile(file) + }.withOption(namespace) { + (ns, configBuilder) => configBuilder.withNamespace(ns) + }.build() val baseHttpClient = HttpClientUtils.createHttpClient(config) val httpClientWithCustomDispatcher = baseHttpClient.newBuilder() .dispatcher(dispatcher) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala index 9c2c7da8cf224..05abc3c17b1f7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala @@ -26,8 +26,10 @@ import org.apache.spark.deploy.k8s.config._ import org.apache.spark.deploy.k8s.constants._ import org.apache.spark.util.Utils -// Configures executor pods. Construct one of these with a SparkConf to set up properties that are -// common across all executors. Then, pass in dynamic parameters into createExecutorPod. +/** + * Configures executor pods. Construct one of these with a SparkConf to set up properties that are + * common across all executors. Then, pass in dynamic parameters into createExecutorPod. + */ private[spark] trait ExecutorPodFactory { def createExecutorPod( executorId: String, @@ -44,52 +46,52 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) import ExecutorPodFactoryImpl._ private val executorExtraClasspath = sparkConf.get( - org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) + org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) private val executorJarsDownloadDir = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION) private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs( - sparkConf, - KUBERNETES_EXECUTOR_LABEL_PREFIX, - "executor label") + sparkConf, + KUBERNETES_EXECUTOR_LABEL_PREFIX, + "executor label") require( - !executorLabels.contains(SPARK_APP_ID_LABEL), - s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.") + !executorLabels.contains(SPARK_APP_ID_LABEL), + s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.") require( - !executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), - s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + - s" Spark.") + !executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), + s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + + s" Spark.") private val executorAnnotations = - ConfigurationUtils.parsePrefixedKeyValuePairs ( - sparkConf, - KUBERNETES_EXECUTOR_ANNOTATION_PREFIX, - "executor annotation") + ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_EXECUTOR_ANNOTATION_PREFIX, + "executor annotation") private val nodeSelector = - ConfigurationUtils.parsePrefixedKeyValuePairs( - sparkConf, - KUBERNETES_NODE_SELECTOR_PREFIX, - "node selector") + ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_NODE_SELECTOR_PREFIX, + "node selector") private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE) private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) private val blockmanagerPort = sparkConf - .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) + .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) private val kubernetesDriverPodName = sparkConf - .get(KUBERNETES_DRIVER_POD_NAME) - .getOrElse(throw new SparkException("Must specify the driver pod name")) + .get(KUBERNETES_DRIVER_POD_NAME) + .getOrElse(throw new SparkException("Must specify the driver pod name")) private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) private val executorMemoryMiB = sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) private val executorMemoryString = sparkConf.get( - org.apache.spark.internal.config.EXECUTOR_MEMORY.key, - org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) + org.apache.spark.internal.config.EXECUTOR_MEMORY.key, + org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) private val memoryOverheadMiB = sparkConf - .get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD) - .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt, - MEMORY_OVERHEAD_MIN_MIB)) + .get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD) + .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt, + MEMORY_OVERHEAD_MIN_MIB)) private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB private val executorCores = sparkConf.getDouble("spark.executor.cores", 1d) @@ -109,10 +111,10 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) // executorId and applicationId val hostname = name.substring(Math.max(0, name.length - 63)) val resolvedExecutorLabels = Map( - SPARK_EXECUTOR_ID_LABEL -> executorId, - SPARK_APP_ID_LABEL -> applicationId, - SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++ - executorLabels + SPARK_EXECUTOR_ID_LABEL -> executorId, + SPARK_APP_ID_LABEL -> applicationId, + SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++ + executorLabels val executorMemoryQuantity = new QuantityBuilder(false) .withAmount(s"${executorMemoryMiB}Mi") .build() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index 386562fb9b3fd..74aac6d211f40 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -41,25 +41,25 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit val sparkConf = sc.getConf val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient( - KUBERNETES_MASTER_INTERNAL_URL, - Some(sparkConf.get(KUBERNETES_NAMESPACE)), - APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX, - sparkConf, - Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), - Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) + KUBERNETES_MASTER_INTERNAL_URL, + Some(sparkConf.get(KUBERNETES_NAMESPACE)), + APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX, + sparkConf, + Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), + Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) val executorPodFactory = new ExecutorPodFactoryImpl(sparkConf) val allocatorExecutor = ThreadUtils - .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator") + .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator") val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool( - "kubernetes-executor-requests") + "kubernetes-executor-requests") new KubernetesClusterSchedulerBackend( - scheduler.asInstanceOf[TaskSchedulerImpl], - sc.env.rpcEnv, - executorPodFactory, - kubernetesClient, - allocatorExecutor, - requestExecutorsService) + scheduler.asInstanceOf[TaskSchedulerImpl], + sc.env.rpcEnv, + executorPodFactory, + kubernetesClient, + allocatorExecutor, + requestExecutorsService) } override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index 8ddfd61b98062..3afc1c60d7ebb 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -65,7 +65,7 @@ private[spark] class KubernetesClusterSchedulerBackend( .getOrElse( throw new SparkException("Must specify the driver pod name")) private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( - requestExecutorsService) + requestExecutorsService) private val driverPod = try { kubernetesClient.pods() @@ -89,9 +89,9 @@ private[spark] class KubernetesClusterSchedulerBackend( protected var totalExpectedExecutors = new AtomicInteger(0) private val driverUrl = RpcEndpointAddress( - conf.get("spark.driver.host"), - conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), - CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + conf.get("spark.driver.host"), + conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), + CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString private val initialExecutors = getInitialTargetExecutorNumber() @@ -121,7 +121,7 @@ private[spark] class KubernetesClusterSchedulerBackend( } else { val nodeToLocalTaskCount = getNodesWithLocalTaskCounts for (i <- 0 until math.min( - totalExpectedExecutors.get - runningExecutorsToPods.size, podAllocationSize)) { + totalExpectedExecutors.get - runningExecutorsToPods.size, podAllocationSize)) { val (executorId, pod) = allocateNewExecutorPod(nodeToLocalTaskCount) runningExecutorsToPods.put(executorId, pod) runningPodsToExecutors.put(pod.getMetadata.getName, executorId) @@ -202,13 +202,13 @@ private[spark] class KubernetesClusterSchedulerBackend( override def start(): Unit = { super.start() executorWatchResource.set( - kubernetesClient - .pods() - .withLabel(SPARK_APP_ID_LABEL, applicationId()) - .watch(new ExecutorPodsWatcher())) + kubernetesClient + .pods() + .withLabel(SPARK_APP_ID_LABEL, applicationId()) + .watch(new ExecutorPodsWatcher())) allocatorExecutor.scheduleWithFixedDelay( - allocatorRunnable, 0L, podAllocationInterval, TimeUnit.SECONDS) + allocatorRunnable, 0L, podAllocationInterval, TimeUnit.SECONDS) if (!Utils.isDynamicAllocationEnabled(conf)) { doRequestTotalExecutors(initialExecutors) @@ -281,12 +281,12 @@ private[spark] class KubernetesClusterSchedulerBackend( private def allocateNewExecutorPod(nodeToLocalTaskCount: Map[String, Int]): (String, Pod) = { val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString val executorPod = executorPodFactory.createExecutorPod( - executorId, - applicationId(), - driverUrl, - conf.getExecutorEnv, - driverPod, - nodeToLocalTaskCount) + executorId, + applicationId(), + driverUrl, + conf.getExecutorEnv, + driverPod, + nodeToLocalTaskCount) try { (executorId, kubernetesClient.pods.create(executorPod)) } catch { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala index fc67f1e8fd69e..6c2a6b8aef2bf 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala @@ -138,12 +138,12 @@ private[spark] class KubernetesClusterSchedulerBackendSuite before { MockitoAnnotations.initMocks(this) sparkConf = new SparkConf() - .set("spark.app.id", APP_ID) - .set(KUBERNETES_DRIVER_POD_NAME, DRIVER_POD_NAME) - .set(KUBERNETES_NAMESPACE, NAMESPACE) - .set("spark.driver.host", SPARK_DRIVER_HOST) - .set("spark.driver.port", SPARK_DRIVER_PORT.toString) - .set(KUBERNETES_ALLOCATION_BATCH_DELAY, POD_ALLOCATION_INTERVAL) + .set("spark.app.id", APP_ID) + .set(KUBERNETES_DRIVER_POD_NAME, DRIVER_POD_NAME) + .set(KUBERNETES_NAMESPACE, NAMESPACE) + .set("spark.driver.host", SPARK_DRIVER_HOST) + .set("spark.driver.port", SPARK_DRIVER_PORT.toString) + .set(KUBERNETES_ALLOCATION_BATCH_DELAY, POD_ALLOCATION_INTERVAL) executorPodsWatcherArgument = ArgumentCaptor.forClass(classOf[Watcher[Pod]]) allocatorRunnable = ArgumentCaptor.forClass(classOf[Runnable]) requestExecutorRunnable = ArgumentCaptor.forClass(classOf[Runnable]) @@ -154,21 +154,21 @@ private[spark] class KubernetesClusterSchedulerBackendSuite when(kubernetesClient.pods()).thenReturn(podOperations) when(podOperations.withLabel(SPARK_APP_ID_LABEL, APP_ID)).thenReturn(podsWithLabelOperations) when(podsWithLabelOperations.watch(executorPodsWatcherArgument.capture())) - .thenReturn(executorPodsWatch) + .thenReturn(executorPodsWatch) when(podOperations.inNamespace(NAMESPACE)).thenReturn(podsInNamespace) when(podsInNamespace.withName(DRIVER_POD_NAME)).thenReturn(podsWithDriverName) when(podsWithDriverName.get()).thenReturn(driverPod) when(allocatorExecutor.scheduleWithFixedDelay( - allocatorRunnable.capture(), - mockitoEq(0L), - mockitoEq(POD_ALLOCATION_INTERVAL), - mockitoEq(TimeUnit.SECONDS))).thenReturn(null) + allocatorRunnable.capture(), + mockitoEq(0L), + mockitoEq(POD_ALLOCATION_INTERVAL), + mockitoEq(TimeUnit.SECONDS))).thenReturn(null) // Creating Futures in Scala backed by a Java executor service resolves to running // ExecutorService#execute (as opposed to submit) doNothing().when(requestExecutorsService).execute(requestExecutorRunnable.capture()) when(rpcEnv.setupEndpoint( - mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME), driverEndpoint.capture())) - .thenReturn(driverEndpointRef) + mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME), driverEndpoint.capture())) + .thenReturn(driverEndpointRef) when(driverEndpointRef.ask[Boolean] (any(classOf[Any])) (any())).thenReturn(mock[Future[Boolean]]) @@ -185,8 +185,8 @@ private[spark] class KubernetesClusterSchedulerBackendSuite test("Static allocation should request executors upon first allocator run.") { sparkConf - .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 2) - .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2) + .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 2) + .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2) val scheduler = newSchedulerBackend() scheduler.start() requestExecutorRunnable.getValue.run() @@ -200,15 +200,15 @@ private[spark] class KubernetesClusterSchedulerBackendSuite test("Killing executors deletes the executor pods") { sparkConf - .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 2) - .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2) + .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 2) + .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2) val scheduler = newSchedulerBackend() scheduler.start() requestExecutorRunnable.getValue.run() expectPodCreationWithId(1, FIRST_EXECUTOR_POD) expectPodCreationWithId(2, SECOND_EXECUTOR_POD) when(podOperations.create(any(classOf[Pod]))) - .thenAnswer(AdditionalAnswers.returnsFirstArg()) + .thenAnswer(AdditionalAnswers.returnsFirstArg()) allocatorRunnable.getValue.run() scheduler.doKillExecutors(Seq("2")) requestExecutorRunnable.getAllValues.asScala.last.run() @@ -218,8 +218,8 @@ private[spark] class KubernetesClusterSchedulerBackendSuite test("Executors should be requested in batches.") { sparkConf - .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1) - .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2) + .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1) + .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2) val scheduler = newSchedulerBackend() scheduler.start() requestExecutorRunnable.getValue.run() @@ -231,10 +231,10 @@ private[spark] class KubernetesClusterSchedulerBackendSuite verify(podOperations).create(FIRST_EXECUTOR_POD) verify(podOperations, never()).create(SECOND_EXECUTOR_POD) val registerFirstExecutorMessage = RegisterExecutor( - "1", mock[RpcEndpointRef], "localhost", 1, Map.empty[String, String]) + "1", mock[RpcEndpointRef], "localhost", 1, Map.empty[String, String]) when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq.empty) driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext]) - .apply(registerFirstExecutorMessage) + .apply(registerFirstExecutorMessage) allocatorRunnable.getValue.run() verify(podOperations).create(SECOND_EXECUTOR_POD) } @@ -242,22 +242,22 @@ private[spark] class KubernetesClusterSchedulerBackendSuite test("Deleting executors and then running an allocator pass after finding the loss reason" + " should only delete the pod once.") { sparkConf - .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1) - .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1) + .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1) + .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1) val scheduler = newSchedulerBackend() scheduler.start() requestExecutorRunnable.getValue.run() when(podOperations.create(any(classOf[Pod]))) - .thenAnswer(AdditionalAnswers.returnsFirstArg()) + .thenAnswer(AdditionalAnswers.returnsFirstArg()) expectPodCreationWithId(1, FIRST_EXECUTOR_POD) allocatorRunnable.getValue.run() val executorEndpointRef = mock[RpcEndpointRef] when(executorEndpointRef.address).thenReturn(RpcAddress("pod.example.com", 9000)) val registerFirstExecutorMessage = RegisterExecutor( - "1", executorEndpointRef, "localhost:9000", 1, Map.empty[String, String]) + "1", executorEndpointRef, "localhost:9000", 1, Map.empty[String, String]) when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq.empty) driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext]) - .apply(registerFirstExecutorMessage) + .apply(registerFirstExecutorMessage) scheduler.doRequestTotalExecutors(0) requestExecutorRunnable.getAllValues.asScala.last.run() scheduler.doKillExecutors(Seq("1")) @@ -270,17 +270,17 @@ private[spark] class KubernetesClusterSchedulerBackendSuite allocatorRunnable.getValue.run() verify(podOperations, times(1)).delete(FIRST_EXECUTOR_POD) verify(driverEndpointRef, times(1)).ask[Boolean]( - RemoveExecutor("1", ExecutorExited( - 0, - exitCausedByApp = false, - s"Container in pod ${exitedPod.getMetadata.getName} exited from" + - s" explicit termination request."))) + RemoveExecutor("1", ExecutorExited( + 0, + exitCausedByApp = false, + s"Container in pod ${exitedPod.getMetadata.getName} exited from" + + s" explicit termination request."))) } test("Executors that disconnect from application errors are noted as exits caused by app.") { sparkConf - .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1) - .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1) + .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1) + .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1) val scheduler = newSchedulerBackend() scheduler.start() expectPodCreationWithId(1, FIRST_EXECUTOR_POD) @@ -290,32 +290,32 @@ private[spark] class KubernetesClusterSchedulerBackendSuite val executorEndpointRef = mock[RpcEndpointRef] when(executorEndpointRef.address).thenReturn(RpcAddress("pod.example.com", 9000)) val registerFirstExecutorMessage = RegisterExecutor( - "1", executorEndpointRef, "localhost:9000", 1, Map.empty[String, String]) + "1", executorEndpointRef, "localhost:9000", 1, Map.empty[String, String]) when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq.empty) driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext]) - .apply(registerFirstExecutorMessage) + .apply(registerFirstExecutorMessage) driverEndpoint.getValue.onDisconnected(executorEndpointRef.address) executorPodsWatcherArgument.getValue.eventReceived( - Action.ERROR, exitPod(FIRST_EXECUTOR_POD, 1)) + Action.ERROR, exitPod(FIRST_EXECUTOR_POD, 1)) expectPodCreationWithId(2, SECOND_EXECUTOR_POD) scheduler.doRequestTotalExecutors(1) requestExecutorRunnable.getValue.run() allocatorRunnable.getAllValues.asScala.last.run() verify(driverEndpointRef).ask[Boolean]( - RemoveExecutor("1", ExecutorExited( - 1, - exitCausedByApp = true, - s"Pod ${FIRST_EXECUTOR_POD.getMetadata.getName}'s executor container exited with" + - " exit status code 1."))) + RemoveExecutor("1", ExecutorExited( + 1, + exitCausedByApp = true, + s"Pod ${FIRST_EXECUTOR_POD.getMetadata.getName}'s executor container exited with" + + " exit status code 1."))) verify(podOperations, never()).delete(FIRST_EXECUTOR_POD) } test("Executors should only try to get the loss reason a number of times before giving up and" + " removing the executor.") { sparkConf - .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1) - .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1) + .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1) + .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1) val scheduler = newSchedulerBackend() scheduler.start() expectPodCreationWithId(1, FIRST_EXECUTOR_POD) @@ -325,10 +325,10 @@ private[spark] class KubernetesClusterSchedulerBackendSuite val executorEndpointRef = mock[RpcEndpointRef] when(executorEndpointRef.address).thenReturn(RpcAddress("pod.example.com", 9000)) val registerFirstExecutorMessage = RegisterExecutor( - "1", executorEndpointRef, "localhost:9000", 1, Map.empty[String, String]) + "1", executorEndpointRef, "localhost:9000", 1, Map.empty[String, String]) when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq.empty) driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext]) - .apply(registerFirstExecutorMessage) + .apply(registerFirstExecutorMessage) driverEndpoint.getValue.onDisconnected(executorEndpointRef.address) 1 to KubernetesClusterSchedulerBackend.MAX_EXECUTOR_LOST_REASON_CHECKS foreach { _ => allocatorRunnable.getValue.run() @@ -343,12 +343,12 @@ private[spark] class KubernetesClusterSchedulerBackendSuite private def newSchedulerBackend(): KubernetesClusterSchedulerBackend = { new KubernetesClusterSchedulerBackend( - taskSchedulerImpl, - rpcEnv, - executorPodFactory, - kubernetesClient, - allocatorExecutor, - requestExecutorsService) + taskSchedulerImpl, + rpcEnv, + executorPodFactory, + kubernetesClient, + allocatorExecutor, + requestExecutorsService) } private def exitPod(basePod: Pod, exitCode: Int): Pod = { @@ -367,12 +367,11 @@ private[spark] class KubernetesClusterSchedulerBackendSuite private def expectPodCreationWithId(executorId: Int, expectedPod: Pod): Unit = { when(executorPodFactory.createExecutorPod( - executorId.toString, - APP_ID, - DRIVER_URL, - sparkConf.getExecutorEnv, - driverPod, - Map.empty)).thenReturn(expectedPod) + executorId.toString, + APP_ID, + DRIVER_URL, + sparkConf.getExecutorEnv, + driverPod, + Map.empty)).thenReturn(expectedPod) } - }