Skip to content

Commit

Permalink
Fail submission if submitter-local files are provided without resourc… (
Browse files Browse the repository at this point in the history
#447)

* Fail submission if submitter-local files are provided without resource staging server URI

* Modified logic to validate only submitted jars; added orchestrator tests

* Incorporated feedback

* Fix failing test case
  • Loading branch information
Sahil Prasad authored and mccheah committed Sep 26, 2017
1 parent be394c6 commit b84056e
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, OptionRequirements, SparkPodInitContainerBootstrapImpl}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.submit.SubmittedDependencyUploaderImpl
import org.apache.spark.deploy.kubernetes.submit.{KubernetesFileUtils, SubmittedDependencyUploaderImpl}
import org.apache.spark.deploy.rest.kubernetes.{ResourceStagingServerSslOptionsProviderImpl, RetrofitClientFactoryImpl}
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -62,6 +62,12 @@ private[spark] class InitContainerConfigurationStepsOrchestrator(
submissionSparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_SSL_ENABLED)
.orElse(submissionSparkConf.get(RESOURCE_STAGING_SERVER_SSL_ENABLED))
.getOrElse(false)

OptionRequirements.requireSecondIfFirstIsDefined(
KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkJars).headOption,
resourceStagingServerUri,
"Local JARs were provided, however no resource staging server URI was found.")

OptionRequirements.requireNandDefined(
maybeResourceStagingServerInternalClientCert,
maybeResourceStagingServerInternalTrustStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,80 @@ class InitContainerConfigurationStepsOrchestratorSuite extends SparkFunSuite {
private val INIT_CONTAINER_CONFIG_MAP_KEY = "spark-init-config-map-key"
private val STAGING_SERVER_URI = "http://localhost:8000"

test ("error thrown if local jars provided without resource staging server") {
val sparkConf = new SparkConf(true)
.set(KUBERNETES_DRIVER_LABELS, s"$DEPRECATED_CUSTOM_LABEL_KEY=$DEPRECATED_CUSTOM_LABEL_VALUE")
.set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$CUSTOM_LABEL_KEY", CUSTOM_LABEL_VALUE)

assert(sparkConf.get(RESOURCE_STAGING_SERVER_URI).isEmpty)

val thrown = intercept[IllegalArgumentException] {
val orchestrator = new InitContainerConfigurationStepsOrchestrator(
NAMESPACE,
APP_RESOURCE_PREFIX,
SPARK_JARS,
SPARK_FILES,
JARS_DOWNLOAD_PATH,
FILES_DOWNLOAD_PATH,
DOCKER_IMAGE_PULL_POLICY,
DRIVER_LABELS,
INIT_CONTAINER_CONFIG_MAP_NAME,
INIT_CONTAINER_CONFIG_MAP_KEY,
sparkConf)
}

assert(thrown.getMessage contains "Local JARs were provided, however no resource staging" +
" server URI was found.")
}

test ("error not thrown with non-local jars and resource staging server provided") {
val sparkConf = new SparkConf(true)
.set(KUBERNETES_DRIVER_LABELS, s"$DEPRECATED_CUSTOM_LABEL_KEY=$DEPRECATED_CUSTOM_LABEL_VALUE")
.set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$CUSTOM_LABEL_KEY", CUSTOM_LABEL_VALUE)
.set(RESOURCE_STAGING_SERVER_URI, STAGING_SERVER_URI)

val orchestrator = new InitContainerConfigurationStepsOrchestrator(
NAMESPACE,
APP_RESOURCE_PREFIX,
SPARK_JARS.take(1),
SPARK_FILES,
JARS_DOWNLOAD_PATH,
FILES_DOWNLOAD_PATH,
DOCKER_IMAGE_PULL_POLICY,
DRIVER_LABELS,
INIT_CONTAINER_CONFIG_MAP_NAME,
INIT_CONTAINER_CONFIG_MAP_KEY,
sparkConf)
val initSteps : Seq[InitContainerConfigurationStep] =
orchestrator.getAllConfigurationSteps()
assert(initSteps.length == 2)
assert(initSteps.head.isInstanceOf[BaseInitContainerConfigurationStep])
assert(initSteps(1).isInstanceOf[SubmittedResourcesInitContainerConfigurationStep])
}

test ("error not thrown with non-local jars and no resource staging server provided") {
val sparkConf = new SparkConf(true)
.set(KUBERNETES_DRIVER_LABELS, s"$DEPRECATED_CUSTOM_LABEL_KEY=$DEPRECATED_CUSTOM_LABEL_VALUE")
.set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$CUSTOM_LABEL_KEY", CUSTOM_LABEL_VALUE)

val orchestrator = new InitContainerConfigurationStepsOrchestrator(
NAMESPACE,
APP_RESOURCE_PREFIX,
SPARK_JARS.take(1),
SPARK_FILES,
JARS_DOWNLOAD_PATH,
FILES_DOWNLOAD_PATH,
DOCKER_IMAGE_PULL_POLICY,
DRIVER_LABELS,
INIT_CONTAINER_CONFIG_MAP_NAME,
INIT_CONTAINER_CONFIG_MAP_KEY,
sparkConf)
val initSteps : Seq[InitContainerConfigurationStep] =
orchestrator.getAllConfigurationSteps()
assert(initSteps.length == 1)
assert(initSteps.head.isInstanceOf[BaseInitContainerConfigurationStep])
}

test ("including step to contact resource staging server") {
val sparkConf = new SparkConf(true)
.set(KUBERNETES_DRIVER_LABELS, s"$DEPRECATED_CUSTOM_LABEL_KEY=$DEPRECATED_CUSTOM_LABEL_VALUE")
Expand Down Expand Up @@ -77,7 +151,7 @@ class InitContainerConfigurationStepsOrchestratorSuite extends SparkFunSuite {
val orchestrator = new InitContainerConfigurationStepsOrchestrator(
NAMESPACE,
APP_RESOURCE_PREFIX,
SPARK_JARS,
SPARK_JARS.take(1),
SPARK_FILES,
JARS_DOWNLOAD_PATH,
FILES_DOWNLOAD_PATH,
Expand Down

0 comments on commit b84056e

Please sign in to comment.