Skip to content

Commit

Permalink
Download remotely-located resources on driver and executor startup vi…
Browse files Browse the repository at this point in the history
…a init-container (apache#251)

* Download remotely-located resources on driver startup. Use init-container in executors.

* FIx owner reference slightly

* Clean up config

* Don't rely too heavily on conventions that can change

* Fix flaky test

* Tidy up file resolver

* Whitespace arrangement

* Indentation change

* Fix more indentation

* Consolidate init container component providers

* Minor method signature and comment changes

* Rename class for consistency

* Resolve conflicts

* Fix flaky test

* Add some tests and some refactoring.

* Make naming consistent for Staged -> Submitted

* Add unit test for the submission client.

* Refine expectations

* Rename variables and fix typos

* Address more comments. Remove redundant SingleKeyConfigMap.

* Minor test adjustments.

* add another test

* Fix conflicts.
  • Loading branch information
mccheah authored and foxish committed Jul 24, 2017
1 parent 20956e7 commit 30597f6
Show file tree
Hide file tree
Showing 46 changed files with 2,620 additions and 1,233 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret}

import org.apache.spark.deploy.kubernetes.constants._

private[spark] trait InitContainerResourceStagingServerSecretPlugin {

/**
* Configure the init-container to mount the secret files that allow it to retrieve dependencies
* from a resource staging server.
*/
def mountResourceStagingServerSecretIntoInitContainer(
initContainer: ContainerBuilder): ContainerBuilder

/**
* Configure the pod to attach a Secret volume which hosts secret files allowing the
* init-container to retrieve dependencies from the resource staging server.
*/
def addResourceStagingServerSecretVolumeToPod(basePod: PodBuilder): PodBuilder
}

private[spark] class InitContainerResourceStagingServerSecretPluginImpl(
initContainerSecretName: String,
initContainerSecretMountPath: String)
extends InitContainerResourceStagingServerSecretPlugin {

override def mountResourceStagingServerSecretIntoInitContainer(
initContainer: ContainerBuilder): ContainerBuilder = {
initContainer.addNewVolumeMount()
.withName(INIT_CONTAINER_SECRET_VOLUME_NAME)
.withMountPath(initContainerSecretMountPath)
.endVolumeMount()
}

override def addResourceStagingServerSecretVolumeToPod(basePod: PodBuilder): PodBuilder = {
basePod.editSpec()
.addNewVolume()
.withName(INIT_CONTAINER_SECRET_VOLUME_NAME)
.withNewSecret()
.withSecretName(initContainerSecretName)
.endSecret()
.endVolume()
.endSpec()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

import io.fabric8.kubernetes.api.model.{ContainerBuilder, EmptyDirVolumeSource, PodBuilder, VolumeMount, VolumeMountBuilder}

import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.submit.v2.{ContainerNameEqualityPredicate, InitContainerUtil}

private[spark] trait SparkPodInitContainerBootstrap {
/**
* Bootstraps an init-container that downloads dependencies to be used by a main container.
* Note that this primarily assumes that the init-container's configuration is being provided
* by a ConfigMap that was installed by some other component; that is, the implementation
* here makes no assumptions about how the init-container is specifically configured. For
* example, this class is unaware if the init-container is fetching remote dependencies or if
* it is fetching dependencies from a resource staging server.
*/
def bootstrapInitContainerAndVolumes(
mainContainerName: String, originalPodSpec: PodBuilder): PodBuilder
}

private[spark] class SparkPodInitContainerBootstrapImpl(
initContainerImage: String,
jarsDownloadPath: String,
filesDownloadPath: String,
downloadTimeoutMinutes: Long,
initContainerConfigMapName: String,
initContainerConfigMapKey: String,
resourceStagingServerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin])
extends SparkPodInitContainerBootstrap {

override def bootstrapInitContainerAndVolumes(
mainContainerName: String,
originalPodSpec: PodBuilder): PodBuilder = {
val sharedVolumeMounts = Seq[VolumeMount](
new VolumeMountBuilder()
.withName(INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME)
.withMountPath(jarsDownloadPath)
.build(),
new VolumeMountBuilder()
.withName(INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME)
.withMountPath(filesDownloadPath)
.build())

val initContainer = new ContainerBuilder()
.withName(s"spark-init")
.withImage(initContainerImage)
.withImagePullPolicy("IfNotPresent")
.addNewVolumeMount()
.withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME)
.withMountPath(INIT_CONTAINER_PROPERTIES_FILE_DIR)
.endVolumeMount()
.addToVolumeMounts(sharedVolumeMounts: _*)
.addToArgs(INIT_CONTAINER_PROPERTIES_FILE_PATH)
val resolvedInitContainer = resourceStagingServerSecretPlugin.map { plugin =>
plugin.mountResourceStagingServerSecretIntoInitContainer(initContainer)
}.getOrElse(initContainer).build()
val podWithBasicVolumes = InitContainerUtil.appendInitContainer(
originalPodSpec, resolvedInitContainer)
.editSpec()
.addNewVolume()
.withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME)
.withNewConfigMap()
.withName(initContainerConfigMapName)
.addNewItem()
.withKey(initContainerConfigMapKey)
.withPath(INIT_CONTAINER_PROPERTIES_FILE_NAME)
.endItem()
.endConfigMap()
.endVolume()
.addNewVolume()
.withName(INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME)
.withEmptyDir(new EmptyDirVolumeSource())
.endVolume()
.addNewVolume()
.withName(INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME)
.withEmptyDir(new EmptyDirVolumeSource())
.endVolume()
.editMatchingContainer(new ContainerNameEqualityPredicate(mainContainerName))
.addToVolumeMounts(sharedVolumeMounts: _*)
.endContainer()
.endSpec()
resourceStagingServerSecretPlugin.map { plugin =>
plugin.addResourceStagingServerSecretVolumeToPod(podWithBasicVolumes)
}.getOrElse(podWithBasicVolumes)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -349,42 +349,43 @@ package object config extends Logging {
.stringConf
.createOptional

private[spark] val RESOURCE_STAGING_SERVER_SSL_NAMESPACE = "kubernetes.resourceStagingServer"
private[spark] val RESOURCE_STAGING_SERVER_CERT_PEM =
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.serverCertPem")
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.serverCertPem")
.doc("Certificate PEM file to use when having the Kubernetes dependency server" +
" listen on TLS.")
.stringConf
.createOptional

private[spark] val RESOURCE_STAGING_SERVER_KEYSTORE_PASSWORD_FILE =
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.keyStorePasswordFile")
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.keyStorePasswordFile")
.doc("File containing the keystore password for the Kubernetes dependency server.")
.stringConf
.createOptional

private[spark] val RESOURCE_STAGING_SERVER_KEYSTORE_KEY_PASSWORD_FILE =
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.keyPasswordFile")
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.keyPasswordFile")
.doc("File containing the key password for the Kubernetes dependency server.")
.stringConf
.createOptional

private[spark] val RESOURCE_STAGING_SERVER_SSL_ENABLED =
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.enabled")
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.enabled")
.doc("Whether or not to use SSL when communicating with the dependency server.")
.booleanConf
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_TRUSTSTORE_FILE =
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.trustStore")
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.trustStore")
.doc("File containing the trustStore to communicate with the Kubernetes dependency server.")
.stringConf
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_TRUSTSTORE_PASSWORD =
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.trustStorePassword")
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.trustStorePassword")
.doc("Password for the trustStore for talking to the dependency server.")
.stringConf
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_TRUSTSTORE_TYPE =
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.trustStoreType")
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.trustStoreType")
.doc("Type of trustStore for communicating with the dependency server.")
.stringConf
.createOptional
Expand All @@ -397,64 +398,120 @@ package object config extends Logging {
.createOptional

private[spark] val INIT_CONTAINER_DOWNLOAD_JARS_RESOURCE_IDENTIFIER =
ConfigBuilder("spark.kubernetes.driver.initcontainer.downloadJarsResourceIdentifier")
ConfigBuilder("spark.kubernetes.initcontainer.downloadJarsResourceIdentifier")
.doc("Identifier for the jars tarball that was uploaded to the staging service.")
.internal()
.stringConf
.createOptional

private[spark] val INIT_CONTAINER_DOWNLOAD_JARS_SECRET_LOCATION =
ConfigBuilder("spark.kubernetes.driver.initcontainer.downloadJarsSecretLocation")
ConfigBuilder("spark.kubernetes.initcontainer.downloadJarsSecretLocation")
.doc("Location of the application secret to use when the init-container contacts the" +
" resource staging server to download jars.")
.internal()
.stringConf
.createWithDefault(INIT_CONTAINER_DOWNLOAD_JARS_SECRET_PATH)
.createWithDefault(s"$INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH/" +
s"$INIT_CONTAINER_SUBMITTED_JARS_SECRET_KEY")

private[spark] val INIT_CONTAINER_DOWNLOAD_FILES_RESOURCE_IDENTIFIER =
ConfigBuilder("spark.kubernetes.driver.initcontainer.downloadFilesResourceIdentifier")
ConfigBuilder("spark.kubernetes.initcontainer.downloadFilesResourceIdentifier")
.doc("Identifier for the files tarball that was uploaded to the staging service.")
.internal()
.stringConf
.createOptional

private[spark] val INIT_CONTAINER_DOWNLOAD_FILES_SECRET_LOCATION =
ConfigBuilder("spark.kubernetes.driver.initcontainer.downloadFilesSecretLocation")
ConfigBuilder("spark.kubernetes.initcontainer.downloadFilesSecretLocation")
.doc("Location of the application secret to use when the init-container contacts the" +
" resource staging server to download files.")
.internal()
.stringConf
.createWithDefault(INIT_CONTAINER_DOWNLOAD_FILES_SECRET_PATH)
.createWithDefault(
s"$INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH/$INIT_CONTAINER_SUBMITTED_FILES_SECRET_KEY")

private[spark] val INIT_CONTAINER_REMOTE_JARS =
ConfigBuilder("spark.kubernetes.initcontainer.remoteJars")
.doc("Comma-separated list of jar URIs to download in the init-container. This is" +
" calculated from spark.jars.")
.internal()
.stringConf
.createOptional

private[spark] val INIT_CONTAINER_REMOTE_FILES =
ConfigBuilder("spark.kubernetes.initcontainer.remoteFiles")
.doc("Comma-separated list of file URIs to download in the init-container. This is" +
" calculated from spark.files.")
.internal()
.stringConf
.createOptional

private[spark] val INIT_CONTAINER_DOCKER_IMAGE =
ConfigBuilder("spark.kubernetes.driver.initcontainer.docker.image")
.doc("Image for the driver's init-container that downloads mounted dependencies.")
ConfigBuilder("spark.kubernetes.initcontainer.docker.image")
.doc("Image for the driver and executor's init-container that downloads dependencies.")
.stringConf
.createWithDefault(s"spark-driver-init:$sparkVersion")
.createWithDefault(s"spark-init:$sparkVersion")

private[spark] val DRIVER_LOCAL_JARS_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.driver.mountdependencies.jarsDownloadDir")
.doc("Location to download local jars to in the driver. When using spark-submit, this" +
" directory must be empty and will be mounted as an empty directory volume on the" +
" driver pod.")
private[spark] val INIT_CONTAINER_JARS_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.mountdependencies.jarsDownloadDir")
.doc("Location to download jars to in the driver and executors. When using" +
" spark-submit, this directory must be empty and will be mounted as an empty directory" +
" volume on the driver and executor pod.")
.stringConf
.createWithDefault("/var/spark-data/spark-local-jars")
.createWithDefault("/var/spark-data/spark-submitted-jars")

private[spark] val DRIVER_LOCAL_FILES_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.driver.mountdependencies.filesDownloadDir")
.doc("Location to download local files to in the driver. When using spark-submit, this" +
" directory must be empty and will be mounted as an empty directory volume on the" +
" driver pod.")
private[spark] val INIT_CONTAINER_FILES_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.mountdependencies.filesDownloadDir")
.doc("Location to download files to in the driver and executors. When using" +
" spark-submit, this directory must be empty and will be mounted as an empty directory" +
" volume on the driver and executor pods.")
.stringConf
.createWithDefault("/var/spark-data/spark-local-files")
.createWithDefault("/var/spark-data/spark-submitted-files")

private[spark] val DRIVER_MOUNT_DEPENDENCIES_INIT_TIMEOUT =
private[spark] val INIT_CONTAINER_MOUNT_TIMEOUT =
ConfigBuilder("spark.kubernetes.mountdependencies.mountTimeout")
.doc("Timeout before aborting the attempt to download and unpack local dependencies from" +
" the dependency staging server when initializing the driver pod.")
" remote locations and the resource staging server when initializing the driver and" +
" executor pods.")
.timeConf(TimeUnit.MINUTES)
.createWithDefault(5)

private[spark] val EXECUTOR_INIT_CONTAINER_CONFIG_MAP =
ConfigBuilder("spark.kubernetes.initcontainer.executor.configmapname")
.doc("Name of the config map to use in the init-container that retrieves submitted files" +
" for the executor.")
.internal()
.stringConf
.createOptional

private[spark] val EXECUTOR_INIT_CONTAINER_CONFIG_MAP_KEY =
ConfigBuilder("spark.kubernetes.initcontainer.executor.configmapkey")
.doc("Key for the entry in the init container config map for submitted files that" +
" corresponds to the properties for this init-container.")
.internal()
.stringConf
.createOptional

private[spark] val EXECUTOR_INIT_CONTAINER_SECRET =
ConfigBuilder("spark.kubernetes.initcontainer.executor.stagingServerSecret.name")
.doc("Name of the secret to mount into the init-container that retrieves submitted files.")
.internal()
.stringConf
.createOptional

private[spark] val EXECUTOR_INIT_CONTAINER_SECRET_MOUNT_DIR =
ConfigBuilder("spark.kubernetes.initcontainer.executor.stagingServerSecret.mountDir")
.doc("Directory to mount the resource staging server secrets into for the executor" +
" init-containers. This must be exactly the same as the directory that the submission" +
" client mounted the secret into because the config map's properties specify the" +
" secret location as to be the same between the driver init-container and the executor" +
" init-container. Thus the submission client will always set this and the driver will" +
" never rely on a constant or convention, in order to protect against cases where the" +
" submission client has a different version from the driver itself, and hence might" +
" have different constants loaded in constants.scala.")
.internal()
.stringConf
.createOptional

private[spark] def resolveK8sMaster(rawMasterString: String): String = {
if (!rawMasterString.startsWith("k8s://")) {
throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.")
Expand Down
Loading

0 comments on commit 30597f6

Please sign in to comment.