Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25875][k8s] Merge code to set up driver command into a single step. #22897

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,34 +125,6 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional

val KUBERNETES_PYSPARK_MAIN_APP_RESOURCE =
ConfigBuilder("spark.kubernetes.python.mainAppResource")
.doc("The main app resource for pyspark jobs")
.internal()
.stringConf
.createOptional

val KUBERNETES_PYSPARK_APP_ARGS =
ConfigBuilder("spark.kubernetes.python.appArgs")
.doc("The app arguments for PySpark Jobs")
.internal()
.stringConf
.createOptional

val KUBERNETES_R_MAIN_APP_RESOURCE =
ConfigBuilder("spark.kubernetes.r.mainAppResource")
.doc("The main app resource for SparkR jobs")
.internal()
.stringConf
.createOptional

val KUBERNETES_R_APP_ARGS =
ConfigBuilder("spark.kubernetes.r.appArgs")
.doc("The app arguments for SparkR Jobs")
.internal()
.stringConf
.createOptional

val KUBERNETES_ALLOCATION_BATCH_SIZE =
ConfigBuilder("spark.kubernetes.allocation.batch.size")
.doc("Number of pods to launch at once in each round of executor allocation.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,8 @@ private[spark] object Constants {
val ENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION"

// BINDINGS
val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY"
val ENV_PYSPARK_FILES = "PYSPARK_FILES"
val ENV_PYSPARK_ARGS = "PYSPARK_APP_ARGS"
val ENV_PYSPARK_MAJOR_PYTHON_VERSION = "PYSPARK_MAJOR_PYTHON_VERSION"
val ENV_R_PRIMARY = "R_PRIMARY"
val ENV_R_ARGS = "R_APP_ARGS"

// Pod spec templates
val EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME = "pod-spec-template.yml"
Expand All @@ -88,6 +84,7 @@ private[spark] object Constants {
val DEFAULT_DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
val DEFAULT_EXECUTOR_CONTAINER_NAME = "spark-kubernetes-executor"
val MEMORY_OVERHEAD_MIN_MIB = 384L
val NON_JVM_MEMORY_OVERHEAD_FACTOR = 0.4d

// Hadoop Configuration
val HADOOP_FILE_VOLUME = "hadoop-properties"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.deploy.k8s.submit.KubernetesClientApplication._
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.util.Utils


private[spark] sealed trait KubernetesRoleSpecificConf
Expand All @@ -40,7 +41,8 @@ private[spark] case class KubernetesDriverSpecificConf(
mainAppResource: Option[MainAppResource],
mainClass: String,
appName: String,
appArgs: Seq[String]) extends KubernetesRoleSpecificConf
appArgs: Seq[String],
pyFiles: Seq[String] = Nil) extends KubernetesRoleSpecificConf

/*
* Structure containing metadata for Kubernetes logic that builds a Spark executor.
Expand Down Expand Up @@ -71,7 +73,6 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
roleSecretEnvNamesToKeyRefs: Map[String, String],
roleEnvs: Map[String, String],
roleVolumes: Iterable[KubernetesVolumeSpec[_ <: KubernetesVolumeSpecificConf]],
sparkFiles: Seq[String],
hadoopConfSpec: Option[HadoopConfSpec]) {

def hadoopConfigMapName: String = s"$appResourceNamePrefix-hadoop-config"
Expand All @@ -83,23 +84,6 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](

def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)

def sparkJars(): Seq[String] = sparkConf
.getOption("spark.jars")
.map(str => str.split(",").toSeq)
.getOrElse(Seq.empty[String])

def pyFiles(): Option[String] = sparkConf
.get(KUBERNETES_PYSPARK_PY_FILES)

def pySparkMainResource(): Option[String] = sparkConf
.get(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE)

def pySparkPythonVersion(): String = sparkConf
.get(PYSPARK_MAJOR_PYTHON_VERSION)

def sparkRMainResource(): Option[String] = sparkConf
.get(KUBERNETES_R_MAIN_APP_RESOURCE)

def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)

def imagePullSecrets(): Seq[LocalObjectReference] = {
Expand Down Expand Up @@ -136,33 +120,6 @@ private[spark] object KubernetesConf {
appArgs: Array[String],
maybePyFiles: Option[String],
hadoopConfDir: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = {
val sparkConfWithMainAppJar = sparkConf.clone()
val additionalFiles = mutable.ArrayBuffer.empty[String]
mainAppResource.foreach {
case JavaMainAppResource(res) =>
val previousJars = sparkConf
.getOption("spark.jars")
.map(_.split(","))
.getOrElse(Array.empty)
if (!previousJars.contains(res)) {
sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
}
// The function of this outer match is to account for multiple nonJVM
// bindings that will all have increased default MEMORY_OVERHEAD_FACTOR to 0.4
case nonJVM: NonJVMResource =>
nonJVM match {
case PythonMainAppResource(res) =>
additionalFiles += res
maybePyFiles.foreach{maybePyFiles =>
additionalFiles.appendAll(maybePyFiles.split(","))}
sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res)
case RMainAppResource(res) =>
additionalFiles += res
sparkConfWithMainAppJar.set(KUBERNETES_R_MAIN_APP_RESOURCE, res)
}
sparkConfWithMainAppJar.setIfMissing(MEMORY_OVERHEAD_FACTOR, 0.4)
}

val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX)
require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with key " +
Expand All @@ -189,11 +146,6 @@ private[spark] object KubernetesConf {
KubernetesVolumeUtils.parseVolumesWithPrefix(
sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get)

val sparkFiles = sparkConf
.getOption("spark.files")
.map(str => str.split(",").toSeq)
.getOrElse(Seq.empty[String]) ++ additionalFiles

val hadoopConfigMapName = sparkConf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP)
KubernetesUtils.requireNandDefined(
hadoopConfDir,
Expand All @@ -206,10 +158,12 @@ private[spark] object KubernetesConf {
} else {
None
}
val pyFiles = maybePyFiles.map(Utils.stringToSeq).getOrElse(Nil)


KubernetesConf(
sparkConfWithMainAppJar,
KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs),
sparkConf.clone(),
KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs, pyFiles),
appResourceNamePrefix,
appId,
driverLabels,
Expand All @@ -218,7 +172,6 @@ private[spark] object KubernetesConf {
driverSecretEnvNamesToKeyRefs,
driverEnvs,
driverVolumes,
sparkFiles,
hadoopConfSpec)
}

Expand Down Expand Up @@ -275,7 +228,6 @@ private[spark] object KubernetesConf {
executorEnvSecrets,
executorEnv,
executorVolumes,
Seq.empty[String],
None)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.config._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.Utils

private[spark] class BasicDriverFeatureStep(
conf: KubernetesConf[KubernetesDriverSpecificConf])
Expand All @@ -47,10 +48,24 @@ private[spark] class BasicDriverFeatureStep(

// Memory settings
private val driverMemoryMiB = conf.get(DRIVER_MEMORY)

// The memory overhead factor to use. If the user has not set it, then use a different
// value for non-JVM apps. This value is propagated to executors.
private val overheadFactor = conf.roleSpecificConf.mainAppResource match {
case Some(_: NonJVMResource) =>
vanzin marked this conversation as resolved.
Show resolved Hide resolved
if (conf.sparkConf.contains(MEMORY_OVERHEAD_FACTOR)) {
conf.get(MEMORY_OVERHEAD_FACTOR)
} else {
NON_JVM_MEMORY_OVERHEAD_FACTOR
}

case _ =>
conf.get(MEMORY_OVERHEAD_FACTOR)
}

private val memoryOverheadMiB = conf
.get(DRIVER_MEMORY_OVERHEAD)
.getOrElse(math.max((conf.get(MEMORY_OVERHEAD_FACTOR) * driverMemoryMiB).toInt,
MEMORY_OVERHEAD_MIN_MIB))
.getOrElse(math.max((overheadFactor * driverMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB))
private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB

override def configurePod(pod: SparkPod): SparkPod = {
Expand Down Expand Up @@ -134,20 +149,18 @@ private[spark] class BasicDriverFeatureStep(
KUBERNETES_DRIVER_POD_NAME.key -> driverPodName,
"spark.app.id" -> conf.appId,
KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> conf.appResourceNamePrefix,
KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true")

val resolvedSparkJars = KubernetesUtils.resolveFileUrisAndPath(
conf.sparkJars())
val resolvedSparkFiles = KubernetesUtils.resolveFileUrisAndPath(
conf.sparkFiles)
if (resolvedSparkJars.nonEmpty) {
additionalProps.put("spark.jars", resolvedSparkJars.mkString(","))
}
if (resolvedSparkFiles.nonEmpty) {
additionalProps.put("spark.files", resolvedSparkFiles.mkString(","))
KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true",
MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString)

Seq("spark.jars", "spark.files").foreach { key =>
conf.getOption(key).foreach { value =>
val resolved = KubernetesUtils.resolveFileUrisAndPath(Utils.stringToSeq(value))
if (resolved.nonEmpty) {
additionalProps.put(key, resolved.mkString(","))
}
}
}

additionalProps.toMap
}

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,13 @@ private[spark] class BasicExecutorFeatureStep(
(kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt,
MEMORY_OVERHEAD_MIN_MIB))
private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB
private val executorMemoryTotal = kubernetesConf.sparkConf
.getOption(APP_RESOURCE_TYPE.key).map{ res =>
val additionalPySparkMemory = res match {
case "python" =>
kubernetesConf.sparkConf
.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)
case _ => 0
}
executorMemoryWithOverhead + additionalPySparkMemory
}.getOrElse(executorMemoryWithOverhead)
private val executorMemoryTotal = kubernetesConf.get(APP_RESOURCE_TYPE) match {
case Some("python") =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was under the impression that generally we don' want to match against option types - instead we should be using option.map.getOrElse? More just my impression of the Scala idiomatic style than anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use both depending on the context. In this case, I think this is more readable than the previous code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I can use an if here if you prefer it. Probably better than either.)

executorMemoryWithOverhead +
kubernetesConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)

case _ => executorMemoryWithOverhead
}

private val executorCores = kubernetesConf.sparkConf.getInt("spark.executor.cores", 1)
private val executorCoresRequest =
Expand Down Expand Up @@ -187,8 +184,4 @@ private[spark] class BasicExecutorFeatureStep(

SparkPod(executorPod, containerWithLimitCores)
}

override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
}
Loading