Skip to content

Commit

Permalink
Bypass init-containers when possible (apache#348)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenchun authored and ash211 committed Jun 23, 2017
1 parent a6291c6 commit 08fe944
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,31 +156,33 @@ private[spark] class Client(
.addToContainers(driverContainer)
.endSpec()

val maybeSubmittedDependencyUploader = initContainerComponentsProvider
.provideInitContainerSubmittedDependencyUploader(allDriverLabels)
val maybeSubmittedResourceIdentifiers = maybeSubmittedDependencyUploader.map { uploader =>
val maybeSubmittedResourceIdentifiers = initContainerComponentsProvider
.provideInitContainerSubmittedDependencyUploader(allDriverLabels)
.map { uploader =>
SubmittedResources(uploader.uploadJars(), uploader.uploadFiles())
}
val maybeSecretBuilder = initContainerComponentsProvider
.provideSubmittedDependenciesSecretBuilder(
maybeSubmittedResourceIdentifiers.map(_.secrets()))
val maybeSubmittedDependenciesSecret = maybeSecretBuilder.map(_.build())
val initContainerConfigMap = initContainerComponentsProvider
.provideInitContainerConfigMapBuilder(maybeSubmittedResourceIdentifiers.map(_.ids()))
.build()
val podWithInitContainer = initContainerComponentsProvider
.provideInitContainerBootstrap()
.bootstrapInitContainerAndVolumes(driverContainer.getName, basePod)
val maybeSubmittedDependenciesSecret = initContainerComponentsProvider
.provideSubmittedDependenciesSecretBuilder(
maybeSubmittedResourceIdentifiers.map(_.secrets()))
.map(_.build())

val containerLocalizedFilesResolver = initContainerComponentsProvider
.provideContainerLocalizedFilesResolver()
.provideContainerLocalizedFilesResolver()
val resolvedSparkJars = containerLocalizedFilesResolver.resolveSubmittedSparkJars()
val resolvedSparkFiles = containerLocalizedFilesResolver.resolveSubmittedSparkFiles()

val executorInitContainerConfiguration = initContainerComponentsProvider
.provideExecutorInitContainerConfiguration()
val sparkConfWithExecutorInit = executorInitContainerConfiguration
.configureSparkConfForExecutorInitContainer(sparkConf)
val initContainerBundler = initContainerComponentsProvider
.provideInitContainerBundle(maybeSubmittedResourceIdentifiers.map(_.ids()),
resolvedSparkJars ++ resolvedSparkFiles)

val podWithInitContainer = initContainerBundler.map(
_.sparkPodInitContainerBootstrap
.bootstrapInitContainerAndVolumes(driverContainer.getName, basePod))
.getOrElse(basePod)
val sparkConfWithExecutorInit = initContainerBundler.map(
_.executorInitContainerConfiguration
.configureSparkConfForExecutorInitContainer(sparkConf))
.getOrElse(sparkConf)
val credentialsMounter = kubernetesCredentialsMounterProvider
.getDriverPodKubernetesCredentialsMounter()
val credentialsSecret = credentialsMounter.createCredentialsSecret()
Expand Down Expand Up @@ -224,7 +226,8 @@ private[spark] class Client(
.watch(loggingPodStatusWatcher)) { _ =>
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
try {
val driverOwnedResources = Seq(initContainerConfigMap) ++
val driverOwnedResources = initContainerBundler.map(
_.sparkInitContainerConfigMap).toSeq ++
maybeSubmittedDependenciesSecret.toSeq ++
credentialsSecret.toSeq
val driverPodOwnerReference = new OwnerReferenceBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.deploy.kubernetes.submit

import io.fabric8.kubernetes.api.model.ConfigMap

import org.apache.spark.{SparkConf, SSLOptions}
import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, OptionRequirements, SparkPodInitContainerBootstrap, SparkPodInitContainerBootstrapImpl}
import org.apache.spark.deploy.kubernetes.config._
Expand All @@ -30,17 +32,15 @@ import org.apache.spark.util.Utils
*/
private[spark] trait DriverInitContainerComponentsProvider {

def provideInitContainerConfigMapBuilder(
maybeSubmittedResourceIds: Option[SubmittedResourceIds])
: SparkInitContainerConfigMapBuilder
def provideContainerLocalizedFilesResolver(): ContainerLocalizedFilesResolver
def provideExecutorInitContainerConfiguration(): ExecutorInitContainerConfiguration
def provideInitContainerSubmittedDependencyUploader(
driverPodLabels: Map[String, String]): Option[SubmittedDependencyUploader]
def provideSubmittedDependenciesSecretBuilder(
maybeSubmittedResourceSecrets: Option[SubmittedResourceSecrets])
: Option[SubmittedDependencySecretBuilder]
def provideInitContainerBootstrap(): SparkPodInitContainerBootstrap
def provideInitContainerBundle(maybeSubmittedResourceIds: Option[SubmittedResourceIds],
uris: Iterable[String]): Option[InitContainerBundle]
}

private[spark] class DriverInitContainerComponentsProviderImpl(
Expand Down Expand Up @@ -105,9 +105,8 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY)
private val downloadTimeoutMinutes = sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT)

override def provideInitContainerConfigMapBuilder(
maybeSubmittedResourceIds: Option[SubmittedResourceIds])
: SparkInitContainerConfigMapBuilder = {
private def provideInitContainerConfigMap(
maybeSubmittedResourceIds: Option[SubmittedResourceIds]): ConfigMap = {
val submittedDependencyConfigPlugin = for {
stagingServerUri <- maybeResourceStagingServerUri
jarsResourceId <- maybeSubmittedResourceIds.map(_.jarsResourceId)
Expand Down Expand Up @@ -136,15 +135,15 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
filesDownloadPath,
configMapName,
configMapKey,
submittedDependencyConfigPlugin)
submittedDependencyConfigPlugin).build()
}

override def provideContainerLocalizedFilesResolver(): ContainerLocalizedFilesResolver = {
new ContainerLocalizedFilesResolverImpl(
sparkJars, sparkFiles, jarsDownloadPath, filesDownloadPath)
}

override def provideExecutorInitContainerConfiguration(): ExecutorInitContainerConfiguration = {
private def provideExecutorInitContainerConfiguration(): ExecutorInitContainerConfiguration = {
new ExecutorInitContainerConfigurationImpl(
maybeSecretName,
INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH,
Expand Down Expand Up @@ -202,4 +201,16 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
configMapKey,
resourceStagingServerSecretPlugin)
}

override def provideInitContainerBundle(
maybeSubmittedResourceIds: Option[SubmittedResourceIds],
uris: Iterable[String]): Option[InitContainerBundle] = {
val containerLocalizedFilesResolver = provideContainerLocalizedFilesResolver()
// Bypass init-containers if `spark.jars` and `spark.files` is empty or only has `local://` URIs
if (KubernetesFileUtils.getNonContainerLocalFiles(uris).nonEmpty) {
Some(InitContainerBundle(provideInitContainerConfigMap(maybeSubmittedResourceIds),
provideInitContainerBootstrap(),
provideExecutorInitContainerConfiguration()))
} else None
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.submit

import io.fabric8.kubernetes.api.model.ConfigMap

import org.apache.spark.deploy.kubernetes.{SparkPodInitContainerBootstrap}

case class InitContainerBundle(
sparkInitContainerConfigMap: ConfigMap,
sparkPodInitContainerBootstrap: SparkPodInitContainerBootstrap,
executorInitContainerConfiguration: ExecutorInitContainerConfiguration)
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ private[spark] object KubernetesFileUtils {
filterUriStringsByScheme(uris, _ == "local")
}

def getNonContainerLocalFiles(uris: Iterable[String]): Iterable[String] = {
filterUriStringsByScheme(uris, _ != "local")
}

def getOnlySubmitterLocalFiles(uris: Iterable[String]): Iterable[String] = {
filterUriStringsByScheme(uris, _ == "file")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
private val CREDENTIALS_SET_CONF = "spark.kubernetes.driverCredentials.provided"
private val CREDENTIALS_SET_ANNOTATION = "credentials-set"

@Mock
private var initContainerConfigMapBuilder: SparkInitContainerConfigMapBuilder = _
@Mock
private var containerLocalizedFilesResolver: ContainerLocalizedFilesResolver = _
@Mock
Expand Down Expand Up @@ -173,12 +171,8 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
})
when(initContainerComponentsProvider.provideContainerLocalizedFilesResolver())
.thenReturn(containerLocalizedFilesResolver)
when(initContainerComponentsProvider.provideExecutorInitContainerConfiguration())
.thenReturn(executorInitContainerConfiguration)
when(submittedDependenciesSecretBuilder.build())
.thenReturn(INIT_CONTAINER_SECRET)
when(initContainerConfigMapBuilder.build())
.thenReturn(INIT_CONTAINER_CONFIG_MAP)
when(kubernetesClient.pods()).thenReturn(podOps)
when(podOps.create(any())).thenAnswer(new Answer[Pod] {
override def answer(invocation: InvocationOnMock): Pod = {
Expand Down Expand Up @@ -214,9 +208,10 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
when(initContainerComponentsProvider
.provideSubmittedDependenciesSecretBuilder(Some(SUBMITTED_RESOURCES.secrets())))
.thenReturn(Some(submittedDependenciesSecretBuilder))
when(initContainerComponentsProvider
.provideInitContainerConfigMapBuilder(Some(SUBMITTED_RESOURCES.ids())))
.thenReturn(initContainerConfigMapBuilder)
when(initContainerComponentsProvider.provideInitContainerBundle(Some(SUBMITTED_RESOURCES.ids()),
RESOLVED_SPARK_JARS ++ RESOLVED_SPARK_FILES))
.thenReturn(Some(InitContainerBundle(INIT_CONTAINER_CONFIG_MAP,
initContainerBootstrap, executorInitContainerConfiguration)))
runAndVerifyDriverPodHasCorrectProperties()
val resourceListArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata])
verify(kubernetesClient).resourceList(resourceListArgumentCaptor.capture())
Expand All @@ -232,8 +227,6 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
verifyConfigMapWasCreated(createdResources)
verify(submittedDependencyUploader).uploadJars()
verify(submittedDependencyUploader).uploadFiles()
verify(initContainerComponentsProvider)
.provideInitContainerConfigMapBuilder(Some(SUBMITTED_RESOURCES.ids()))
verify(initContainerComponentsProvider)
.provideSubmittedDependenciesSecretBuilder(Some(SUBMITTED_RESOURCES.secrets()))
}
Expand All @@ -250,8 +243,6 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
verifyConfigMapWasCreated(createdResources)
verify(submittedDependencyUploader, times(0)).uploadJars()
verify(submittedDependencyUploader, times(0)).uploadFiles()
verify(initContainerComponentsProvider)
.provideInitContainerConfigMapBuilder(None)
verify(initContainerComponentsProvider)
.provideSubmittedDependenciesSecretBuilder(None)
}
Expand Down Expand Up @@ -321,9 +312,10 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
when(initContainerComponentsProvider
.provideSubmittedDependenciesSecretBuilder(None))
.thenReturn(None)
when(initContainerComponentsProvider
.provideInitContainerConfigMapBuilder(None))
.thenReturn(initContainerConfigMapBuilder)
when(initContainerComponentsProvider.provideInitContainerBundle(None, RESOLVED_SPARK_JARS ++
RESOLVED_SPARK_FILES))
.thenReturn(Some(InitContainerBundle(INIT_CONTAINER_CONFIG_MAP,
initContainerBootstrap, executorInitContainerConfiguration)))
}

private def expectationsForNoMountedCredentials(): Unit = {
Expand Down

0 comments on commit 08fe944

Please sign in to comment.