Skip to content

Commit

Permalink
Make mainAppResource not optional.
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcelo Vanzin committed Nov 1, 2018
1 parent 1a61f72 commit 91765ab
Show file tree
Hide file tree
Showing 14 changed files with 62 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,15 @@ private[spark] sealed trait KubernetesRoleSpecificConf
* Structure containing metadata for Kubernetes logic that builds a Spark driver.
*/
private[spark] case class KubernetesDriverSpecificConf(
mainAppResource: Option[MainAppResource],
mainAppResource: MainAppResource,
mainClass: String,
appName: String,
appArgs: Seq[String],
pyFiles: Seq[String] = Nil) extends KubernetesRoleSpecificConf
pyFiles: Seq[String] = Nil) extends KubernetesRoleSpecificConf {

require(mainAppResource != null, "Main resource must be provided.")

}

/*
* Structure containing metadata for Kubernetes logic that builds a Spark executor.
Expand Down Expand Up @@ -115,7 +119,7 @@ private[spark] object KubernetesConf {
appName: String,
appResourceNamePrefix: String,
appId: String,
mainAppResource: Option[MainAppResource],
mainAppResource: MainAppResource,
mainClass: String,
appArgs: Array[String],
maybePyFiles: Option[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,16 @@ private[spark] class BasicDriverFeatureStep(

// The memory overhead factor to use. If the user has not set it, then use a different
// value for non-JVM apps. This value is propagated to executors.
private val overheadFactor = conf.roleSpecificConf.mainAppResource match {
case Some(_: NonJVMResource) =>
private val overheadFactor =
if (conf.roleSpecificConf.mainAppResource.isInstanceOf[NonJVMResource]) {
if (conf.sparkConf.contains(MEMORY_OVERHEAD_FACTOR)) {
conf.get(MEMORY_OVERHEAD_FACTOR)
} else {
NON_JVM_MEMORY_OVERHEAD_FACTOR
}

case _ =>
} else {
conf.get(MEMORY_OVERHEAD_FACTOR)
}
}

private val memoryOverheadMiB = conf
.get(DRIVER_MEMORY_OVERHEAD)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,30 +39,27 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesConf[KubernetesDri

override def configurePod(pod: SparkPod): SparkPod = {
driverConf.mainAppResource match {
case Some(JavaMainAppResource(_)) | None =>
case JavaMainAppResource(_) =>
configureForJava(pod)

case Some(PythonMainAppResource(res)) =>
case PythonMainAppResource(res) =>
configureForPython(pod, res)

case Some(RMainAppResource(res)) =>
case RMainAppResource(res) =>
configureForR(pod, res)
}
}

override def getAdditionalPodSystemProperties(): Map[String, String] = {
driverConf.mainAppResource match {
case Some(JavaMainAppResource(res)) =>
additionalJavaProperties(res)
case JavaMainAppResource(res) =>
res.map(additionalJavaProperties).getOrElse(Map.empty)

case Some(PythonMainAppResource(res)) =>
case PythonMainAppResource(res) =>
additionalPythonProperties(res)

case Some(RMainAppResource(res)) =>
case RMainAppResource(res) =>
additionalRProperties(res)

case None =>
Map.empty
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.util.Utils
* @param maybePyFiles additional Python files via --py-files
*/
private[spark] case class ClientArguments(
mainAppResource: Option[MainAppResource],
mainAppResource: MainAppResource,
mainClass: String,
driverArgs: Array[String],
maybePyFiles: Option[String],
Expand All @@ -53,18 +53,18 @@ private[spark] case class ClientArguments(
private[spark] object ClientArguments {

def fromCommandLineArgs(args: Array[String]): ClientArguments = {
var mainAppResource: Option[MainAppResource] = None
var mainAppResource: MainAppResource = JavaMainAppResource(None)
var mainClass: Option[String] = None
val driverArgs = mutable.ArrayBuffer.empty[String]
var maybePyFiles : Option[String] = None

args.sliding(2, 2).toList.foreach {
case Array("--primary-java-resource", primaryJavaResource: String) =>
mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
mainAppResource = JavaMainAppResource(Some(primaryJavaResource))
case Array("--primary-py-file", primaryPythonResource: String) =>
mainAppResource = Some(PythonMainAppResource(primaryPythonResource))
mainAppResource = PythonMainAppResource(primaryPythonResource)
case Array("--primary-r-file", primaryRFile: String) =>
mainAppResource = Some(RMainAppResource(primaryRFile))
mainAppResource = RMainAppResource(primaryRFile)
case Array("--other-py-files", pyFiles: String) =>
maybePyFiles = Some(pyFiles)
case Array("--main-class", clazz: String) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ private[spark] sealed trait MainAppResource

private[spark] sealed trait NonJVMResource

private[spark] case class JavaMainAppResource(primaryResource: String) extends MainAppResource
private[spark] case class JavaMainAppResource(primaryResource: Option[String])
extends MainAppResource

private[spark] case class PythonMainAppResource(primaryResource: String)
extends MainAppResource with NonJVMResource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class KubernetesConfSuite extends SparkFunSuite {
APP_NAME,
RESOURCE_NAME_PREFIX,
APP_ID,
mainAppResource = None,
mainAppResource = JavaMainAppResource(None),
MAIN_CLASS,
APP_ARGS,
maybePyFiles = None,
Expand All @@ -65,7 +65,6 @@ class KubernetesConfSuite extends SparkFunSuite {
assert(conf.sparkConf.getAll.toMap === sparkConf.getAll.toMap)
assert(conf.appResourceNamePrefix === RESOURCE_NAME_PREFIX)
assert(conf.roleSpecificConf.appName === APP_NAME)
assert(conf.roleSpecificConf.mainAppResource.isEmpty)
assert(conf.roleSpecificConf.mainClass === MAIN_CLASS)
assert(conf.roleSpecificConf.appArgs === APP_ARGS)
}
Expand Down Expand Up @@ -94,7 +93,7 @@ class KubernetesConfSuite extends SparkFunSuite {
APP_NAME,
RESOURCE_NAME_PREFIX,
APP_ID,
mainAppResource = None,
mainAppResource = JavaMainAppResource(None),
MAIN_CLASS,
APP_ARGS,
maybePyFiles = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
new LocalObjectReferenceBuilder().withName(secret).build()
}
private val emptyDriverSpecificConf = KubernetesDriverSpecificConf(
None,
JavaMainAppResource(None),
APP_NAME,
MAIN_CLASS,
APP_ARGS)
Expand Down Expand Up @@ -144,7 +144,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
val javaKubernetesConf = KubernetesConf(
javaSparkConf,
KubernetesDriverSpecificConf(
Some(JavaMainAppResource("")),
JavaMainAppResource(None),
APP_NAME,
PY_MAIN_CLASS,
APP_ARGS),
Expand All @@ -161,7 +161,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
val pythonKubernetesConf = KubernetesConf(
pythonSparkConf,
KubernetesDriverSpecificConf(
Some(PythonMainAppResource("")),
PythonMainAppResource(""),
APP_NAME,
PY_MAIN_CLASS,
APP_ARGS),
Expand Down Expand Up @@ -220,12 +220,10 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
// Memory overhead tests. Tuples are:
// test name, main resource, overhead factor, expected factor
Seq(
("java w/o resource", None, None, MEMORY_OVERHEAD_FACTOR.defaultValue.get),
("java w/ resource", Some(JavaMainAppResource(null)), None,
MEMORY_OVERHEAD_FACTOR.defaultValue.get),
("python default", Some(PythonMainAppResource(null)), None, NON_JVM_MEMORY_OVERHEAD_FACTOR),
("python w/ override", Some(PythonMainAppResource(null)), Some(0.9d), 0.9d),
("r default", Some(RMainAppResource(null)), None, NON_JVM_MEMORY_OVERHEAD_FACTOR)
("java", JavaMainAppResource(None), None, MEMORY_OVERHEAD_FACTOR.defaultValue.get),
("python default", PythonMainAppResource(null), None, NON_JVM_MEMORY_OVERHEAD_FACTOR),
("python w/ override", PythonMainAppResource(null), Some(0.9d), 0.9d),
("r default", RMainAppResource(null), None, NON_JVM_MEMORY_OVERHEAD_FACTOR)
).foreach { case (name, resource, factor, expectedFactor) =>
test(s"memory overhead factor: $name") {
// Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite {
test("java resource") {
val mainResource = "local:///main.jar"
val spec = applyFeatureStep(
resource = Some(JavaMainAppResource(mainResource)),
resource = JavaMainAppResource(Some(mainResource)),
appArgs = Array("5", "7"))
assert(spec.pod.container.getArgs.asScala === List(
"driver",
Expand All @@ -62,7 +62,7 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite {

val spec = applyFeatureStep(
conf = sparkConf,
resource = Some(PythonMainAppResource(mainResource)))
resource = PythonMainAppResource(mainResource))
assert(spec.pod.container.getArgs.asScala === List(
"driver",
"--properties-file", SPARK_CONF_PATH,
Expand Down Expand Up @@ -90,7 +90,7 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite {
.set(PYSPARK_MAJOR_PYTHON_VERSION, "2")
val spec = applyFeatureStep(
conf = sparkConf,
resource = Some(PythonMainAppResource(mainResource)),
resource = PythonMainAppResource(mainResource),
appArgs = Array("5", "7", "9"),
pyFiles = pyFiles)

Expand All @@ -117,7 +117,7 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite {
val mainResource = s"local://$expectedMainResource"

val spec = applyFeatureStep(
resource = Some(RMainAppResource(mainResource)),
resource = RMainAppResource(mainResource),
appArgs = Array("5", "7", "9"))

assert(spec.pod.container.getArgs.asScala === List(
Expand All @@ -129,7 +129,7 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite {

private def applyFeatureStep(
conf: SparkConf = new SparkConf(false),
resource: Option[MainAppResource] = None,
resource: MainAppResource = JavaMainAppResource(None),
appArgs: Array[String] = Array(),
pyFiles: Seq[String] = Nil): KubernetesDriverSpec = {
val driverConf = new KubernetesDriverSpecificConf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
import org.apache.spark.util.Clock

class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
Expand Down Expand Up @@ -59,7 +60,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
KubernetesConf(
sparkConf,
KubernetesDriverSpecificConf(
None, "main", "app", Seq.empty),
JavaMainAppResource(None), "main", "app", Seq.empty),
SHORT_RESOURCE_NAME_PREFIX,
"app-id",
DRIVER_LABELS,
Expand Down Expand Up @@ -91,7 +92,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
.set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 8080)
.set(KUBERNETES_NAMESPACE, "my-namespace"),
KubernetesDriverSpecificConf(
None, "main", "app", Seq.empty),
JavaMainAppResource(None), "main", "app", Seq.empty),
SHORT_RESOURCE_NAME_PREFIX,
"app-id",
DRIVER_LABELS,
Expand All @@ -113,7 +114,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
KubernetesConf(
sparkConf,
KubernetesDriverSpecificConf(
None, "main", "app", Seq.empty),
JavaMainAppResource(None), "main", "app", Seq.empty),
SHORT_RESOURCE_NAME_PREFIX,
"app-id",
DRIVER_LABELS,
Expand Down Expand Up @@ -144,7 +145,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
KubernetesConf(
sparkConf.set(KUBERNETES_NAMESPACE, "my-namespace"),
KubernetesDriverSpecificConf(
None, "main", "app", Seq.empty),
JavaMainAppResource(None), "main", "app", Seq.empty),
LONG_RESOURCE_NAME_PREFIX,
"app-id",
DRIVER_LABELS,
Expand Down Expand Up @@ -172,7 +173,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
KubernetesConf(
sparkConf.set(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS, "host"),
KubernetesDriverSpecificConf(
None, "main", "app", Seq.empty),
JavaMainAppResource(None), "main", "app", Seq.empty),
LONG_RESOURCE_NAME_PREFIX,
"app-id",
DRIVER_LABELS,
Expand All @@ -198,7 +199,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
KubernetesConf(
sparkConf,
KubernetesDriverSpecificConf(
None, "main", "app", Seq.empty),
JavaMainAppResource(None), "main", "app", Seq.empty),
LONG_RESOURCE_NAME_PREFIX,
"app-id",
DRIVER_LABELS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.submit.JavaMainAppResource

class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
private val defaultLocalDir = "/var/data/default-local-dir"
Expand All @@ -36,7 +37,7 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
kubernetesConf = KubernetesConf(
sparkConf,
KubernetesDriverSpecificConf(
None,
JavaMainAppResource(None),
"app-name",
"main",
Seq.empty),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ package org.apache.spark.deploy.k8s.features

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.submit.JavaMainAppResource

class MountVolumesFeatureStepSuite extends SparkFunSuite {
private val sparkConf = new SparkConf(false)
private val emptyKubernetesConf = KubernetesConf(
sparkConf = sparkConf,
roleSpecificConf = KubernetesDriverSpecificConf(
None,
JavaMainAppResource(None),
"app-name",
"main",
Seq.empty),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfter

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.submit.JavaMainAppResource

class PodTemplateConfigMapStepSuite extends SparkFunSuite with BeforeAndAfter {
private var sparkConf: SparkConf = _
Expand All @@ -36,7 +37,7 @@ class PodTemplateConfigMapStepSuite extends SparkFunSuite with BeforeAndAfter {
kubernetesConf = KubernetesConf(
sparkConf,
KubernetesDriverSpecificConf(
None,
JavaMainAppResource(None),
"app-name",
"main",
Seq.empty),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.deploy.k8s.submit.JavaMainAppResource

class ClientSuite extends SparkFunSuite with BeforeAndAfter {

Expand Down Expand Up @@ -133,7 +134,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
sparkConf = new SparkConf(false)
kubernetesConf = KubernetesConf[KubernetesDriverSpecificConf](
sparkConf,
KubernetesDriverSpecificConf(None, MAIN_CLASS, APP_NAME, APP_ARGS),
KubernetesDriverSpecificConf(JavaMainAppResource(None), MAIN_CLASS, APP_NAME, APP_ARGS),
KUBERNETES_RESOURCE_PREFIX,
APP_ID,
Map.empty,
Expand Down
Loading

0 comments on commit 91765ab

Please sign in to comment.