diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 98393cbbbba2d..b18987f6af4a4 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -558,6 +558,22 @@ from the other deployment modes. See the [configuration page](configuration.html disk as a secret into the init-containers. + + spark.kubernetes.mountdependencies.jarsDownloadDir + /var/spark-data/spark-jars + + Location to download jars to in the driver and executors. This will be mounted as an empty directory volume + into the driver and executor containers. + + + + spark.kubernetes.mountdependencies.filesDownloadDir + /var/spark-data/spark-files + + Location to download files to in the driver and executors. This will be mounted as an empty directory volume + into the driver and executor containers. + + spark.kubernetes.report.interval 1s diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkPodInitContainerBootstrap.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkPodInitContainerBootstrap.scala index 0d4e82566643d..a4d0aeb23d01f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkPodInitContainerBootstrap.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkPodInitContainerBootstrap.scala @@ -93,6 +93,10 @@ private[spark] class SparkPodInitContainerBootstrapImpl( .endVolume() .editMatchingContainer(new ContainerNameEqualityPredicate(mainContainerName)) .addToVolumeMounts(sharedVolumeMounts: _*) + .addNewEnv() + .withName(ENV_MOUNTED_FILES_DIR) + .withValue(filesDownloadPath) + .endEnv() .endContainer() .endSpec() resourceStagingServerSecretPlugin.map { plugin => diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index bcb9a96cae960..c892b01314975 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -447,7 +447,7 @@ package object config extends Logging { " spark-submit, this directory must be empty and will be mounted as an empty directory" + " volume on the driver and executor pod.") .stringConf - .createWithDefault("/var/spark-data/spark-submitted-jars") + .createWithDefault("/var/spark-data/spark-jars") private[spark] val INIT_CONTAINER_FILES_DOWNLOAD_LOCATION = ConfigBuilder("spark.kubernetes.mountdependencies.filesDownloadDir") @@ -455,7 +455,7 @@ package object config extends Logging { " spark-submit, this directory must be empty and will be mounted as an empty directory" + " volume on the driver and executor pods.") .stringConf - .createWithDefault("/var/spark-data/spark-submitted-files") + .createWithDefault("/var/spark-data/spark-files") private[spark] val INIT_CONTAINER_MOUNT_TIMEOUT = ConfigBuilder("spark.kubernetes.mountdependencies.mountTimeout") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index ea11ca2ec8f21..5515e88a50fb0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -92,6 +92,7 @@ package object constants { private[spark] val ENV_DRIVER_MAIN_CLASS = "SPARK_DRIVER_CLASS" private[spark] val ENV_DRIVER_ARGS = "SPARK_DRIVER_ARGS" private[spark] val ENV_DRIVER_JAVA_OPTS = "SPARK_DRIVER_JAVA_OPTS" + private[spark] val ENV_MOUNTED_FILES_DIR = "SPARK_MOUNTED_FILES_DIR" // Annotation keys private[spark] val ANNOTATION_PROVIDE_EXTERNAL_URI = diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/SparkPodInitContainerBootstrapSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/SparkPodInitContainerBootstrapSuite.scala index 6db7d3ff2da53..3feba80f800c7 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/SparkPodInitContainerBootstrapSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/SparkPodInitContainerBootstrapSuite.scala @@ -111,6 +111,16 @@ class SparkPodInitContainerBootstrapSuite extends SparkFunSuite with BeforeAndAf }) } + test("Files download path is set as environment variable") { + val bootstrappedPod = bootstrapPodWithoutSubmittedDependencies() + val containers = bootstrappedPod.getSpec.getContainers.asScala + val maybeMainContainer = containers.find(_.getName === MAIN_CONTAINER_NAME) + assert(maybeMainContainer.exists { mainContainer => + mainContainer.getEnv.asScala.exists(envVar => + envVar.getName == ENV_MOUNTED_FILES_DIR && envVar.getValue == FILES_DOWNLOAD_PATH) + }) + } + test("Running with submitted dependencies modifies the init container with the plugin.") { val bootstrappedPod = bootstrapPodWithSubmittedDependencies() val podAnnotations = bootstrappedPod.getMetadata.getAnnotations.asScala diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile index 40f9459dc06dc..c4c75642c9d22 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile @@ -40,4 +40,5 @@ CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ + if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ exec ${JAVA_HOME}/bin/java $SPARK_DRIVER_JAVA_OPTS -cp $SPARK_CLASSPATH -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY $SPARK_DRIVER_CLASS $SPARK_DRIVER_ARGS diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile index c5f1c43ff7cf4..e345f10056522 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile @@ -40,4 +40,5 @@ WORKDIR /opt/spark CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ + if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ exec ${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP diff --git a/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/FileExistenceTest.scala b/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/FileExistenceTest.scala index 8b8d5e05f6479..8994c998bffee 100644 --- a/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/FileExistenceTest.scala +++ b/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/FileExistenceTest.scala @@ -28,7 +28,9 @@ private[spark] object FileExistenceTest { def main(args: Array[String]): Unit = { if (args.length < 2) { - throw new IllegalArgumentException("Usage: WordCount ") + throw new IllegalArgumentException( + s"Invalid args: ${args.mkString}, " + + "Usage: FileExistenceTest ") } // Can't use SparkContext.textFile since the file is local to the driver val file = Paths.get(args(0)).toFile @@ -39,16 +41,15 @@ private[spark] object FileExistenceTest { val contents = Files.toString(file, Charsets.UTF_8) if (args(1) != contents) { throw new SparkException(s"Contents do not match. Expected: ${args(1)}," + - s" actual, $contents") + s" actual: $contents") } else { println(s"File found at ${file.getAbsolutePath} with correct contents.") } // scalastyle:on println } - val spark = SparkSession.builder() - .appName("Test") - .getOrCreate() - spark.stop() + while (true) { + Thread.sleep(600000) + } } } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index d23bfcdbc5251..95775d262a69d 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.deploy.kubernetes.integrationtest +import java.io.File import java.nio.file.Paths import java.util.UUID @@ -35,11 +36,11 @@ import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minik import org.apache.spark.deploy.kubernetes.integrationtest.constants.MINIKUBE_TEST_BACKEND import org.apache.spark.deploy.kubernetes.submit.{Client, KeyAndCertPem} import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.util.Utils private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { import KubernetesSuite._ private val testBackend = IntegrationTestBackendFactory.getTestBackend() - private val APP_LOCATOR_LABEL = UUID.randomUUID().toString.replaceAll("-", "") private var kubernetesTestComponents: KubernetesTestComponents = _ private var sparkConf: SparkConf = _ @@ -124,7 +125,11 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { sparkConf.set("spark.kubernetes.shuffle.labels", "app=spark-shuffle-service") sparkConf.set("spark.kubernetes.shuffle.namespace", kubernetesTestComponents.namespace) sparkConf.set("spark.app.name", "group-by-test") - runSparkGroupByTestAndVerifyCompletion(SUBMITTER_LOCAL_MAIN_APP_RESOURCE) + runSparkApplicationAndVerifyCompletion( + SUBMITTER_LOCAL_MAIN_APP_RESOURCE, + GROUP_BY_MAIN_CLASS, + "The Result is", + Array.empty[String]) } test("Use remote resources without the resource staging server.") { @@ -173,6 +178,20 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { runSparkPiAndVerifyCompletion(SparkLauncher.NO_RESOURCE) } + test("Added files should be placed in the driver's working directory.") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + val testExistenceFileTempDir = Utils.createTempDir(namePrefix = "test-existence-file-temp-dir") + val testExistenceFile = new File(testExistenceFileTempDir, "input.txt") + Files.write(TEST_EXISTENCE_FILE_CONTENTS, testExistenceFile, Charsets.UTF_8) + launchStagingServer(SSLOptions(), None) + sparkConf.set("spark.files", testExistenceFile.getAbsolutePath) + runSparkApplicationAndVerifyCompletion( + SUBMITTER_LOCAL_MAIN_APP_RESOURCE, + FILE_EXISTENCE_MAIN_CLASS, + s"File found at /opt/spark/${testExistenceFile.getName} with correct contents.", + Array(testExistenceFile.getName, TEST_EXISTENCE_FILE_CONTENTS)) + } + private def launchStagingServer( resourceStagingServerSslOptions: SSLOptions, keyAndCertPem: Option[KeyAndCertPem]): Unit = { assume(testBackend.name == MINIKUBE_TEST_BACKEND) @@ -190,27 +209,19 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { } private def runSparkPiAndVerifyCompletion(appResource: String): Unit = { - Client.run(sparkConf, appResource, SPARK_PI_MAIN_CLASS, Array.empty[String]) - val driverPod = kubernetesTestComponents.kubernetesClient - .pods() - .withLabel("spark-app-locator", APP_LOCATOR_LABEL) - .list() - .getItems - .get(0) - Eventually.eventually(TIMEOUT, INTERVAL) { - assert(kubernetesTestComponents.kubernetesClient - .pods() - .withName(driverPod.getMetadata.getName) - .getLog - .contains("Pi is roughly 3"), "The application did not compute the value of pi.") - } + runSparkApplicationAndVerifyCompletion( + appResource, SPARK_PI_MAIN_CLASS, "Pi is roughly 3", Array.empty[String]) } - private def runSparkGroupByTestAndVerifyCompletion(appResource: String): Unit = { + private def runSparkApplicationAndVerifyCompletion( + appResource: String, + mainClass: String, + expectedLogOnCompletion: String, + appArgs: Array[String]): Unit = { Client.run( sparkConf = sparkConf, - appArgs = Array.empty[String], - mainClass = GROUP_BY_MAIN_CLASS, + appArgs = appArgs, + mainClass = mainClass, mainAppResource = appResource) val driverPod = kubernetesTestComponents.kubernetesClient .pods() @@ -223,7 +234,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { .pods() .withName(driverPod.getMetadata.getName) .getLog - .contains("The Result is"), "The application did not complete.") + .contains(expectedLogOnCompletion), "The application did not complete.") } } @@ -285,8 +296,6 @@ private[spark] object KubernetesSuite { val CONTAINER_LOCAL_HELPER_JAR_PATH = s"local:///opt/spark/examples/" + s"integration-tests-jars/${HELPER_JAR_FILE.getName}" - val TEST_EXISTENCE_FILE = Paths.get("test-data", "input.txt").toFile - val TEST_EXISTENCE_FILE_CONTENTS = Files.toString(TEST_EXISTENCE_FILE, Charsets.UTF_8) val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) val SPARK_PI_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" + @@ -295,6 +304,7 @@ private[spark] object KubernetesSuite { ".integrationtest.jobs.FileExistenceTest" val GROUP_BY_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" + ".integrationtest.jobs.GroupByTest" + val TEST_EXISTENCE_FILE_CONTENTS = "contents" case object ShuffleNotReadyException extends Exception } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala index 9ae0d9ade7dc2..0ca1f482269db 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala @@ -63,7 +63,7 @@ private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesCl .set("spark.executor.memory", "500m") .set("spark.executor.cores", "1") .set("spark.executors.instances", "1") - .set("spark.app.name", "spark-pi") + .set("spark.app.name", "spark-test-app") .set("spark.ui.enabled", "true") .set("spark.testing", "false") .set(WAIT_FOR_APP_COMPLETION, false)