Skip to content

Commit

Permalink
[SPARK-25875][K8S] Merge code to set up driver command into a single …
Browse files Browse the repository at this point in the history
…step.

Right now there are 3 different classes dealing with building the driver
command to run inside the pod, one for each "binding" supported by Spark.
This has two main shortcomings:

- the code in the 3 classes is very similar; changing things in one place
  would probably mean making a similar change in the others.

- it gives the false impression that the step implementation is the only
  place where binding-specific logic is needed. That is not true; there
  was code in KubernetesConf that was binding-specific, and there's also
  code in the executor-specific config step. So the 3 classes weren't really
  working as a language-specific abstraction.

On top of that, the current code was propagating command line parameters in
a different way depending on the binding. That doesn't seem necessary, and
in fact using environment variables for command line parameters is in general
a really bad idea, since you can't handle special characters (e.g. spaces)
that way.

This change merges the 3 different code paths for Java, Python and R into
a single step, and also merges the 3 code paths to start the Spark driver
in the k8s entry point script. This increases the amount of shared code,
and also moves more feature logic into the step itself, so it doesn't live
in KubernetesConf.

Note that not all logic related to setting up the driver lives in that
step. For example, the memory overhead calculation still lives separately,
except it now happens in the driver config step instead of outside the
step hierarchy altogether.

Some of the noise in the diff is because of changes to KubernetesConf, which
will be addressed in a separate change.

Tested with new and updated unit tests + integration tests.

Author: Marcelo Vanzin <[email protected]>

Closes apache#22897 from vanzin/SPARK-25875.
  • Loading branch information
Marcelo Vanzin authored and jackylee-ch committed Feb 18, 2019
1 parent ba918d6 commit fca8abc
Show file tree
Hide file tree
Showing 32 changed files with 438 additions and 855 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.deploy.k8s

import java.util.concurrent.TimeUnit

import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigBuilder

Expand Down Expand Up @@ -125,34 +126,6 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional

val KUBERNETES_PYSPARK_MAIN_APP_RESOURCE =
ConfigBuilder("spark.kubernetes.python.mainAppResource")
.doc("The main app resource for pyspark jobs")
.internal()
.stringConf
.createOptional

val KUBERNETES_PYSPARK_APP_ARGS =
ConfigBuilder("spark.kubernetes.python.appArgs")
.doc("The app arguments for PySpark Jobs")
.internal()
.stringConf
.createOptional

val KUBERNETES_R_MAIN_APP_RESOURCE =
ConfigBuilder("spark.kubernetes.r.mainAppResource")
.doc("The main app resource for SparkR jobs")
.internal()
.stringConf
.createOptional

val KUBERNETES_R_APP_ARGS =
ConfigBuilder("spark.kubernetes.r.appArgs")
.doc("The app arguments for SparkR Jobs")
.internal()
.stringConf
.createOptional

val KUBERNETES_ALLOCATION_BATCH_SIZE =
ConfigBuilder("spark.kubernetes.allocation.batch.size")
.doc("Number of pods to launch at once in each round of executor allocation.")
Expand Down Expand Up @@ -267,6 +240,7 @@ private[spark] object Config extends Logging {
.doc("This sets the resource type internally")
.internal()
.stringConf
.checkValues(Set(APP_RESOURCE_TYPE_JAVA, APP_RESOURCE_TYPE_PYTHON, APP_RESOURCE_TYPE_R))
.createOptional

val KUBERNETES_LOCAL_DIRS_TMPFS =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,8 @@ private[spark] object Constants {
val ENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION"

// BINDINGS
val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY"
val ENV_PYSPARK_FILES = "PYSPARK_FILES"
val ENV_PYSPARK_ARGS = "PYSPARK_APP_ARGS"
val ENV_PYSPARK_MAJOR_PYTHON_VERSION = "PYSPARK_MAJOR_PYTHON_VERSION"
val ENV_R_PRIMARY = "R_PRIMARY"
val ENV_R_ARGS = "R_APP_ARGS"

// Pod spec templates
val EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME = "pod-spec-template.yml"
Expand All @@ -88,6 +84,7 @@ private[spark] object Constants {
val DEFAULT_DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
val DEFAULT_EXECUTOR_CONTAINER_NAME = "spark-kubernetes-executor"
val MEMORY_OVERHEAD_MIN_MIB = 384L
val NON_JVM_MEMORY_OVERHEAD_FACTOR = 0.4d

// Hadoop Configuration
val HADOOP_FILE_VOLUME = "hadoop-properties"
Expand All @@ -113,4 +110,9 @@ private[spark] object Constants {
// Hadoop credentials secrets for the Spark app.
val SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR = "/mnt/secrets/hadoop-credentials"
val SPARK_APP_HADOOP_SECRET_VOLUME_NAME = "hadoop-secret"

// Application resource types.
val APP_RESOURCE_TYPE_JAVA = "java"
val APP_RESOURCE_TYPE_PYTHON = "python"
val APP_RESOURCE_TYPE_R = "r"
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.deploy.k8s.security.KubernetesHadoopDelegationTokenManag
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.deploy.k8s.submit.KubernetesClientApplication._
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.util.Utils


private[spark] sealed trait KubernetesRoleSpecificConf
Expand All @@ -36,10 +37,15 @@ private[spark] sealed trait KubernetesRoleSpecificConf
* Structure containing metadata for Kubernetes logic that builds a Spark driver.
*/
private[spark] case class KubernetesDriverSpecificConf(
mainAppResource: Option[MainAppResource],
mainAppResource: MainAppResource,
mainClass: String,
appName: String,
appArgs: Seq[String]) extends KubernetesRoleSpecificConf
appArgs: Seq[String],
pyFiles: Seq[String] = Nil) extends KubernetesRoleSpecificConf {

require(mainAppResource != null, "Main resource must be provided.")

}

/*
* Structure containing metadata for Kubernetes logic that builds a Spark executor.
Expand Down Expand Up @@ -70,7 +76,6 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
roleSecretEnvNamesToKeyRefs: Map[String, String],
roleEnvs: Map[String, String],
roleVolumes: Iterable[KubernetesVolumeSpec[_ <: KubernetesVolumeSpecificConf]],
sparkFiles: Seq[String],
hadoopConfSpec: Option[HadoopConfSpec]) {

def hadoopConfigMapName: String = s"$appResourceNamePrefix-hadoop-config"
Expand All @@ -82,23 +87,6 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](

def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)

def sparkJars(): Seq[String] = sparkConf
.getOption("spark.jars")
.map(str => str.split(",").toSeq)
.getOrElse(Seq.empty[String])

def pyFiles(): Option[String] = sparkConf
.get(KUBERNETES_PYSPARK_PY_FILES)

def pySparkMainResource(): Option[String] = sparkConf
.get(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE)

def pySparkPythonVersion(): String = sparkConf
.get(PYSPARK_MAJOR_PYTHON_VERSION)

def sparkRMainResource(): Option[String] = sparkConf
.get(KUBERNETES_R_MAIN_APP_RESOURCE)

def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)

def imagePullSecrets(): Seq[LocalObjectReference] = {
Expand Down Expand Up @@ -130,38 +118,11 @@ private[spark] object KubernetesConf {
appName: String,
appResourceNamePrefix: String,
appId: String,
mainAppResource: Option[MainAppResource],
mainAppResource: MainAppResource,
mainClass: String,
appArgs: Array[String],
maybePyFiles: Option[String],
hadoopConfDir: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = {
val sparkConfWithMainAppJar = sparkConf.clone()
val additionalFiles = mutable.ArrayBuffer.empty[String]
mainAppResource.foreach {
case JavaMainAppResource(res) =>
val previousJars = sparkConf
.getOption("spark.jars")
.map(_.split(","))
.getOrElse(Array.empty)
if (!previousJars.contains(res)) {
sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
}
// The function of this outer match is to account for multiple nonJVM
// bindings that will all have increased default MEMORY_OVERHEAD_FACTOR to 0.4
case nonJVM: NonJVMResource =>
nonJVM match {
case PythonMainAppResource(res) =>
additionalFiles += res
maybePyFiles.foreach{maybePyFiles =>
additionalFiles.appendAll(maybePyFiles.split(","))}
sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res)
case RMainAppResource(res) =>
additionalFiles += res
sparkConfWithMainAppJar.set(KUBERNETES_R_MAIN_APP_RESOURCE, res)
}
sparkConfWithMainAppJar.setIfMissing(MEMORY_OVERHEAD_FACTOR, 0.4)
}

val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX)
require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with key " +
Expand All @@ -188,11 +149,6 @@ private[spark] object KubernetesConf {
KubernetesVolumeUtils.parseVolumesWithPrefix(
sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get)

val sparkFiles = sparkConf
.getOption("spark.files")
.map(str => str.split(",").toSeq)
.getOrElse(Seq.empty[String]) ++ additionalFiles

val hadoopConfigMapName = sparkConf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP)
KubernetesUtils.requireNandDefined(
hadoopConfDir,
Expand All @@ -205,10 +161,12 @@ private[spark] object KubernetesConf {
} else {
None
}
val pyFiles = maybePyFiles.map(Utils.stringToSeq).getOrElse(Nil)


KubernetesConf(
sparkConfWithMainAppJar,
KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs),
sparkConf.clone(),
KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs, pyFiles),
appResourceNamePrefix,
appId,
driverLabels,
Expand All @@ -217,7 +175,6 @@ private[spark] object KubernetesConf {
driverSecretEnvNamesToKeyRefs,
driverEnvs,
driverVolumes,
sparkFiles,
hadoopConfSpec)
}

Expand Down Expand Up @@ -274,7 +231,6 @@ private[spark] object KubernetesConf {
executorEnvSecrets,
executorEnv,
executorVolumes,
Seq.empty[String],
None)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.config._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.Utils

private[spark] class BasicDriverFeatureStep(
conf: KubernetesConf[KubernetesDriverSpecificConf])
Expand All @@ -47,10 +48,23 @@ private[spark] class BasicDriverFeatureStep(

// Memory settings
private val driverMemoryMiB = conf.get(DRIVER_MEMORY)

// The memory overhead factor to use. If the user has not set it, then use a different
// value for non-JVM apps. This value is propagated to executors.
private val overheadFactor =
if (conf.roleSpecificConf.mainAppResource.isInstanceOf[NonJVMResource]) {
if (conf.sparkConf.contains(MEMORY_OVERHEAD_FACTOR)) {
conf.get(MEMORY_OVERHEAD_FACTOR)
} else {
NON_JVM_MEMORY_OVERHEAD_FACTOR
}
} else {
conf.get(MEMORY_OVERHEAD_FACTOR)
}

private val memoryOverheadMiB = conf
.get(DRIVER_MEMORY_OVERHEAD)
.getOrElse(math.max((conf.get(MEMORY_OVERHEAD_FACTOR) * driverMemoryMiB).toInt,
MEMORY_OVERHEAD_MIN_MIB))
.getOrElse(math.max((overheadFactor * driverMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB))
private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB

override def configurePod(pod: SparkPod): SparkPod = {
Expand Down Expand Up @@ -134,20 +148,18 @@ private[spark] class BasicDriverFeatureStep(
KUBERNETES_DRIVER_POD_NAME.key -> driverPodName,
"spark.app.id" -> conf.appId,
KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> conf.appResourceNamePrefix,
KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true")

val resolvedSparkJars = KubernetesUtils.resolveFileUrisAndPath(
conf.sparkJars())
val resolvedSparkFiles = KubernetesUtils.resolveFileUrisAndPath(
conf.sparkFiles)
if (resolvedSparkJars.nonEmpty) {
additionalProps.put("spark.jars", resolvedSparkJars.mkString(","))
}
if (resolvedSparkFiles.nonEmpty) {
additionalProps.put("spark.files", resolvedSparkFiles.mkString(","))
KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true",
MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString)

Seq("spark.jars", "spark.files").foreach { key =>
conf.getOption(key).foreach { value =>
val resolved = KubernetesUtils.resolveFileUrisAndPath(Utils.stringToSeq(value))
if (resolved.nonEmpty) {
additionalProps.put(key, resolved.mkString(","))
}
}
}

additionalProps.toMap
}

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,13 @@ private[spark] class BasicExecutorFeatureStep(
(kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt,
MEMORY_OVERHEAD_MIN_MIB))
private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB
private val executorMemoryTotal = kubernetesConf.sparkConf
.getOption(APP_RESOURCE_TYPE.key).map{ res =>
val additionalPySparkMemory = res match {
case "python" =>
kubernetesConf.sparkConf
.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)
case _ => 0
}
executorMemoryWithOverhead + additionalPySparkMemory
}.getOrElse(executorMemoryWithOverhead)
private val executorMemoryTotal =
if (kubernetesConf.get(APP_RESOURCE_TYPE) == Some(APP_RESOURCE_TYPE_PYTHON)) {
executorMemoryWithOverhead +
kubernetesConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)
} else {
executorMemoryWithOverhead
}

private val executorCores = kubernetesConf.sparkConf.getInt("spark.executor.cores", 1)
private val executorCoresRequest =
Expand Down Expand Up @@ -187,8 +184,4 @@ private[spark] class BasicExecutorFeatureStep(

SparkPod(executorPod, containerWithLimitCores)
}

override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
}
Loading

0 comments on commit fca8abc

Please sign in to comment.