Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22757][Kubernetes] Enable use of remote dependencies (http, s3, gcs, etc.) in Kubernetes mode #19954

Closed
wants to merge 14 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import java.util.concurrent.TimeUnit

import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.network.util.ByteUnit

private[spark] object Config extends Logging {

Expand Down Expand Up @@ -132,30 +131,84 @@ private[spark] object Config extends Logging {

val JARS_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.mountDependencies.jarsDownloadDir")
.doc("Location to download jars to in the driver and executors. When using" +
" spark-submit, this directory must be empty and will be mounted as an empty directory" +
" volume on the driver and executor pod.")
.doc("Location to download jars to in the driver and executors. When using " +
"spark-submit, this directory must be empty and will be mounted as an empty directory " +
"volume on the driver and executor pod.")
.stringConf
.createWithDefault("/var/spark-data/spark-jars")

val FILES_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.mountDependencies.filesDownloadDir")
.doc("Location to download files to in the driver and executors. When using" +
" spark-submit, this directory must be empty and will be mounted as an empty directory" +
" volume on the driver and executor pods.")
.doc("Location to download files to in the driver and executors. When using " +
"spark-submit, this directory must be empty and will be mounted as an empty directory " +
"volume on the driver and executor pods.")
.stringConf
.createWithDefault("/var/spark-data/spark-files")

val INIT_CONTAINER_IMAGE =
ConfigBuilder("spark.kubernetes.initContainer.image")
.doc("Image for the driver and executor's init-container for downloading dependencies.")
.stringConf
.createOptional

val INIT_CONTAINER_MOUNT_TIMEOUT =
ConfigBuilder("spark.kubernetes.mountDependencies.timeout")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: spark.kubernetes.initContainer.mountDependencies.timeout?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see the response regarding spark.kubernetes.mountDependencies.maxSimultaneousDownloads.

.doc("Timeout before aborting the attempt to download and unpack dependencies from remote " +
"locations into the driver and executor pods.")
.timeConf(TimeUnit.SECONDS)
.createWithDefault(300)

val INIT_CONTAINER_MAX_THREAD_POOL_SIZE =
ConfigBuilder("spark.kubernetes.mountDependencies.maxSimultaneousDownloads")
.doc("Maximum number of remote dependencies to download simultaneously in a driver or " +
"executor pod.")
.intConf
.createWithDefault(5)

val INIT_CONTAINER_REMOTE_JARS =
ConfigBuilder("spark.kubernetes.initContainer.remoteJars")
.doc("Comma-separated list of jar URIs to download in the init-container. This is " +
"calculated from spark.jars.")
.internal()
.stringConf
.createOptional

val INIT_CONTAINER_REMOTE_FILES =
ConfigBuilder("spark.kubernetes.initContainer.remoteFiles")
.doc("Comma-separated list of file URIs to download in the init-container. This is " +
"calculated from spark.files.")
.internal()
.stringConf
.createOptional

val INIT_CONTAINER_CONFIG_MAP_NAME =
ConfigBuilder("spark.kubernetes.initContainer.configMapName")
.doc("Name of the config map to use in the init-container that retrieves submitted files " +
"for the executor.")
.internal()
.stringConf
.createOptional

val INIT_CONTAINER_CONFIG_MAP_KEY_CONF =
ConfigBuilder("spark.kubernetes.initContainer.configMapKey")
.doc("Key for the entry in the init container config map for submitted files that " +
"corresponds to the properties for this init-container.")
.internal()
.stringConf
.createOptional

val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
"spark.kubernetes.authenticate.submission"

val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."

val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."

val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
val KUBERNETES_EXECUTOR_SECRETS_PREFIX = "spark.kubernetes.executor.secrets."

val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv."
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ private[spark] object Constants {
val ENV_DRIVER_JAVA_OPTS = "SPARK_DRIVER_JAVA_OPTS"
val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS"
val ENV_DRIVER_MEMORY = "SPARK_DRIVER_MEMORY"
val ENV_MOUNTED_FILES_DIR = "SPARK_MOUNTED_FILES_DIR"

// Bootstrapping dependencies with the init-container
val INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME = "download-jars-volume"
val INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME = "download-files-volume"
val INIT_CONTAINER_PROPERTIES_FILE_VOLUME = "spark-init-properties"
val INIT_CONTAINER_PROPERTIES_FILE_DIR = "/etc/spark-init"
val INIT_CONTAINER_PROPERTIES_FILE_NAME = "spark-init.properties"
val INIT_CONTAINER_PROPERTIES_FILE_PATH =
s"$INIT_CONTAINER_PROPERTIES_FILE_DIR/$INIT_CONTAINER_PROPERTIES_FILE_NAME"
val INIT_CONTAINER_SECRET_VOLUME_NAME = "spark-init-secret"

// Miscellaneous
val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerBuilder, EmptyDirVolumeSource, EnvVarBuilder, PodBuilder, VolumeMount, VolumeMountBuilder}

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._

/**
* Bootstraps an init-container for downloading remote dependencies. This is separated out from
* the init-container steps API because this component can be used to bootstrap init-containers
* for both the driver and executors.
*/
private[spark] class InitContainerBootstrap(
initContainerImage: String,
imagePullPolicy: String,
jarsDownloadPath: String,
filesDownloadPath: String,
configMapName: String,
configMapKey: String,
sparkRole: String,
sparkConf: SparkConf) {

/**
* Bootstraps an init-container that downloads dependencies to be used by a main container.
*/
def bootstrapInitContainer(
original: PodWithDetachedInitContainer): PodWithDetachedInitContainer = {
val sharedVolumeMounts = Seq[VolumeMount](
new VolumeMountBuilder()
.withName(INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME)
.withMountPath(jarsDownloadPath)
.build(),
new VolumeMountBuilder()
.withName(INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME)
.withMountPath(filesDownloadPath)
.build())

val customEnvVarKeyPrefix = sparkRole match {
case SPARK_POD_DRIVER_ROLE => KUBERNETES_DRIVER_ENV_KEY
case SPARK_POD_EXECUTOR_ROLE => "spark.executorEnv."
case _ => throw new SparkException(s"$sparkRole is not a valid Spark pod role")
}
val customEnvVars = sparkConf.getAllWithPrefix(customEnvVarKeyPrefix).toSeq.map {
case (key, value) =>
new EnvVarBuilder()
.withName(key)
.withValue(value)
.build()
}

val initContainer = new ContainerBuilder(original.initContainer)
.withName("spark-init")
.withImage(initContainerImage)
.withImagePullPolicy(imagePullPolicy)
.addAllToEnv(customEnvVars.asJava)
.addNewVolumeMount()
.withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME)
.withMountPath(INIT_CONTAINER_PROPERTIES_FILE_DIR)
.endVolumeMount()
.addToVolumeMounts(sharedVolumeMounts: _*)
.addToArgs(INIT_CONTAINER_PROPERTIES_FILE_PATH)
.build()

val podWithBasicVolumes = new PodBuilder(original.pod)
.editSpec()
.addNewVolume()
.withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME)
.withNewConfigMap()
.withName(configMapName)
.addNewItem()
.withKey(configMapKey)
.withPath(INIT_CONTAINER_PROPERTIES_FILE_NAME)
.endItem()
.endConfigMap()
.endVolume()
.addNewVolume()
.withName(INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME)
.withEmptyDir(new EmptyDirVolumeSource())
.endVolume()
.addNewVolume()
.withName(INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME)
.withEmptyDir(new EmptyDirVolumeSource())
.endVolume()
.endSpec()
.build()

val mainContainer = new ContainerBuilder(original.mainContainer)
.addToVolumeMounts(sharedVolumeMounts: _*)
.addNewEnv()
.withName(ENV_MOUNTED_FILES_DIR)
.withValue(filesDownloadPath)
.endEnv()
.build()

PodWithDetachedInitContainer(
podWithBasicVolumes,
initContainer,
mainContainer)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,49 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit
package org.apache.spark.deploy.k8s

import java.io.File

import io.fabric8.kubernetes.api.model.{Container, Pod, PodBuilder}

import org.apache.spark.SparkConf
import org.apache.spark.util.Utils

private[spark] object KubernetesFileUtils {
private[spark] object KubernetesUtils {

/**
* Extract and parse Spark configuration properties with a given name prefix and
* return the result as a Map. Keys must not have more than one value.
*
* @param sparkConf Spark configuration
* @param prefix the given property name prefix
* @return a Map storing the configuration property keys and values
*/
def parsePrefixedKeyValuePairs(
sparkConf: SparkConf,
prefix: String): Map[String, String] = {
sparkConf.getAllWithPrefix(prefix).toMap
}

def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
}

/**
* Append the given init-container to a pod's list of init-containers.
*
* @param originalPodSpec original specification of the pod
* @param initContainer the init-container to add to the pod
* @return the pod with the init-container added to the list of InitContainers
*/
def appendInitContainer(originalPodSpec: Pod, initContainer: Container): Pod = {
new PodBuilder(originalPodSpec)
.editOrNewSpec()
.addToInitContainers(initContainer)
.endSpec()
.build()
}

/**
* For the given collection of file URIs, resolves them as follows:
Expand All @@ -47,6 +83,16 @@ private[spark] object KubernetesFileUtils {
}
}

/**
* Get from a given collection of file URIs the ones that represent remote files.
*/
def getOnlyRemoteFiles(uris: Iterable[String]): Iterable[String] = {
uris.filter { uri =>
val scheme = Utils.resolveURI(uri).getScheme
scheme != "file" && scheme != "local"
}
}

private def resolveFileUri(
uri: String,
fileDownloadPath: String,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s

import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder}

/**
* Bootstraps a driver or executor container or an init-container with needed secrets mounted.
*/
private[spark] class MountSecretsBootstrap(secretNamesToMountPaths: Map[String, String]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand this class correctly, it seems like what you're trying to do here is to inject this logic into different steps that require this functionality. So the code that instantiates those steps needs to know about this dependency, and needs to know how to create both objects. Then the step implementation has to call this code.

Instead, wouldn't it be cleaner to make the step inherit this functionality?

e.g.

trait MountSecretsBootstrap(args) {
  def mountSecrets(...) { }
}

class InitContainerMountSecretsStep extends InitContainerConfigurationStep with MountSecretsBootstrap {

}

The same comment could be made about the init container boostrap.

But in both cases, I'm not sure this would work right now on the executor side, because as I mentioned it doesn't really use the same abstraction as the driver side. Which is kinda one of the problems with the current class hierarchy here...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created https://issues.apache.org/jira/browse/SPARK-22839 to keep track of the refactoring work.


/**
* Mounts Kubernetes secrets as secret volumes into the given container in the given pod.
*
* @param pod the pod into which the secret volumes are being added.
* @param container the container into which the secret volumes are being mounted.
* @return the updated pod and container with the secrets mounted.
*/
def mountSecrets(pod: Pod, container: Container): (Pod, Container) = {
var podBuilder = new PodBuilder(pod)
secretNamesToMountPaths.keys.foreach { name =>
podBuilder = podBuilder
.editOrNewSpec()
.addNewVolume()
.withName(secretVolumeName(name))
.withNewSecret()
.withSecretName(name)
.endSecret()
.endVolume()
.endSpec()
}

var containerBuilder = new ContainerBuilder(container)
secretNamesToMountPaths.foreach { case (name, path) =>
containerBuilder = containerBuilder
.addNewVolumeMount()
.withName(secretVolumeName(name))
.withMountPath(path)
.endVolumeMount()
}

(podBuilder.build(), containerBuilder.build())
}

private def secretVolumeName(secretName: String): String = {
secretName + "-volume"
}
}
Loading