Skip to content

Commit

Permalink
Added files should be in the working directories. (apache#294)
Browse files Browse the repository at this point in the history
* Added files should be in the working directories.

* Revert unintentional changes

* Fix test
  • Loading branch information
mccheah authored and ash211 committed May 23, 2017
1 parent 8f3d965 commit 56414f9
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 31 deletions.
16 changes: 16 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,22 @@ from the other deployment modes. See the [configuration page](configuration.html
disk as a secret into the init-containers.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.mountdependencies.jarsDownloadDir</code></td>
<td><code>/var/spark-data/spark-jars</code></td>
<td>
Location to download jars to in the driver and executors. This will be mounted as an empty directory volume
into the driver and executor containers.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.mountdependencies.filesDownloadDir</code></td>
<td><code>/var/spark-data/spark-files</code></td>
<td>
Location to download files to in the driver and executors. This will be mounted as an empty directory volume
into the driver and executor containers.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.report.interval</code></td>
<td><code>1s</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
.endVolume()
.editMatchingContainer(new ContainerNameEqualityPredicate(mainContainerName))
.addToVolumeMounts(sharedVolumeMounts: _*)
.addNewEnv()
.withName(ENV_MOUNTED_FILES_DIR)
.withValue(filesDownloadPath)
.endEnv()
.endContainer()
.endSpec()
resourceStagingServerSecretPlugin.map { plugin =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,15 +447,15 @@ package object config extends Logging {
" spark-submit, this directory must be empty and will be mounted as an empty directory" +
" volume on the driver and executor pod.")
.stringConf
.createWithDefault("/var/spark-data/spark-submitted-jars")
.createWithDefault("/var/spark-data/spark-jars")

private[spark] val INIT_CONTAINER_FILES_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.mountdependencies.filesDownloadDir")
.doc("Location to download files to in the driver and executors. When using" +
" spark-submit, this directory must be empty and will be mounted as an empty directory" +
" volume on the driver and executor pods.")
.stringConf
.createWithDefault("/var/spark-data/spark-submitted-files")
.createWithDefault("/var/spark-data/spark-files")

private[spark] val INIT_CONTAINER_MOUNT_TIMEOUT =
ConfigBuilder("spark.kubernetes.mountdependencies.mountTimeout")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ package object constants {
private[spark] val ENV_DRIVER_MAIN_CLASS = "SPARK_DRIVER_CLASS"
private[spark] val ENV_DRIVER_ARGS = "SPARK_DRIVER_ARGS"
private[spark] val ENV_DRIVER_JAVA_OPTS = "SPARK_DRIVER_JAVA_OPTS"
private[spark] val ENV_MOUNTED_FILES_DIR = "SPARK_MOUNTED_FILES_DIR"

// Annotation keys
private[spark] val ANNOTATION_PROVIDE_EXTERNAL_URI =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,16 @@ class SparkPodInitContainerBootstrapSuite extends SparkFunSuite with BeforeAndAf
})
}

test("Files download path is set as environment variable") {
val bootstrappedPod = bootstrapPodWithoutSubmittedDependencies()
val containers = bootstrappedPod.getSpec.getContainers.asScala
val maybeMainContainer = containers.find(_.getName === MAIN_CONTAINER_NAME)
assert(maybeMainContainer.exists { mainContainer =>
mainContainer.getEnv.asScala.exists(envVar =>
envVar.getName == ENV_MOUNTED_FILES_DIR && envVar.getValue == FILES_DOWNLOAD_PATH)
})
}

test("Running with submitted dependencies modifies the init container with the plugin.") {
val bootstrappedPod = bootstrapPodWithSubmittedDependencies()
val podAnnotations = bootstrappedPod.getMetadata.getAnnotations.asScala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
exec ${JAVA_HOME}/bin/java $SPARK_DRIVER_JAVA_OPTS -cp $SPARK_CLASSPATH -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY $SPARK_DRIVER_CLASS $SPARK_DRIVER_ARGS
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ WORKDIR /opt/spark
CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
exec ${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ private[spark] object FileExistenceTest {

def main(args: Array[String]): Unit = {
if (args.length < 2) {
throw new IllegalArgumentException("Usage: WordCount <source-file> <expected contents>")
throw new IllegalArgumentException(
s"Invalid args: ${args.mkString}, " +
"Usage: FileExistenceTest <source-file> <expected contents>")
}
// Can't use SparkContext.textFile since the file is local to the driver
val file = Paths.get(args(0)).toFile
Expand All @@ -39,16 +41,15 @@ private[spark] object FileExistenceTest {
val contents = Files.toString(file, Charsets.UTF_8)
if (args(1) != contents) {
throw new SparkException(s"Contents do not match. Expected: ${args(1)}," +
s" actual, $contents")
s" actual: $contents")
} else {
println(s"File found at ${file.getAbsolutePath} with correct contents.")
}
// scalastyle:on println
}
val spark = SparkSession.builder()
.appName("Test")
.getOrCreate()
spark.stop()
while (true) {
Thread.sleep(600000)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.deploy.kubernetes.integrationtest

import java.io.File
import java.nio.file.Paths
import java.util.UUID

Expand All @@ -35,11 +36,11 @@ import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minik
import org.apache.spark.deploy.kubernetes.integrationtest.constants.MINIKUBE_TEST_BACKEND
import org.apache.spark.deploy.kubernetes.submit.{Client, KeyAndCertPem}
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.Utils

private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
import KubernetesSuite._
private val testBackend = IntegrationTestBackendFactory.getTestBackend()

private val APP_LOCATOR_LABEL = UUID.randomUUID().toString.replaceAll("-", "")
private var kubernetesTestComponents: KubernetesTestComponents = _
private var sparkConf: SparkConf = _
Expand Down Expand Up @@ -124,7 +125,11 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
sparkConf.set("spark.kubernetes.shuffle.labels", "app=spark-shuffle-service")
sparkConf.set("spark.kubernetes.shuffle.namespace", kubernetesTestComponents.namespace)
sparkConf.set("spark.app.name", "group-by-test")
runSparkGroupByTestAndVerifyCompletion(SUBMITTER_LOCAL_MAIN_APP_RESOURCE)
runSparkApplicationAndVerifyCompletion(
SUBMITTER_LOCAL_MAIN_APP_RESOURCE,
GROUP_BY_MAIN_CLASS,
"The Result is",
Array.empty[String])
}

test("Use remote resources without the resource staging server.") {
Expand Down Expand Up @@ -173,6 +178,20 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
runSparkPiAndVerifyCompletion(SparkLauncher.NO_RESOURCE)
}

test("Added files should be placed in the driver's working directory.") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
val testExistenceFileTempDir = Utils.createTempDir(namePrefix = "test-existence-file-temp-dir")
val testExistenceFile = new File(testExistenceFileTempDir, "input.txt")
Files.write(TEST_EXISTENCE_FILE_CONTENTS, testExistenceFile, Charsets.UTF_8)
launchStagingServer(SSLOptions(), None)
sparkConf.set("spark.files", testExistenceFile.getAbsolutePath)
runSparkApplicationAndVerifyCompletion(
SUBMITTER_LOCAL_MAIN_APP_RESOURCE,
FILE_EXISTENCE_MAIN_CLASS,
s"File found at /opt/spark/${testExistenceFile.getName} with correct contents.",
Array(testExistenceFile.getName, TEST_EXISTENCE_FILE_CONTENTS))
}

private def launchStagingServer(
resourceStagingServerSslOptions: SSLOptions, keyAndCertPem: Option[KeyAndCertPem]): Unit = {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
Expand All @@ -190,27 +209,19 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}

private def runSparkPiAndVerifyCompletion(appResource: String): Unit = {
Client.run(sparkConf, appResource, SPARK_PI_MAIN_CLASS, Array.empty[String])
val driverPod = kubernetesTestComponents.kubernetesClient
.pods()
.withLabel("spark-app-locator", APP_LOCATOR_LABEL)
.list()
.getItems
.get(0)
Eventually.eventually(TIMEOUT, INTERVAL) {
assert(kubernetesTestComponents.kubernetesClient
.pods()
.withName(driverPod.getMetadata.getName)
.getLog
.contains("Pi is roughly 3"), "The application did not compute the value of pi.")
}
runSparkApplicationAndVerifyCompletion(
appResource, SPARK_PI_MAIN_CLASS, "Pi is roughly 3", Array.empty[String])
}

private def runSparkGroupByTestAndVerifyCompletion(appResource: String): Unit = {
private def runSparkApplicationAndVerifyCompletion(
appResource: String,
mainClass: String,
expectedLogOnCompletion: String,
appArgs: Array[String]): Unit = {
Client.run(
sparkConf = sparkConf,
appArgs = Array.empty[String],
mainClass = GROUP_BY_MAIN_CLASS,
appArgs = appArgs,
mainClass = mainClass,
mainAppResource = appResource)
val driverPod = kubernetesTestComponents.kubernetesClient
.pods()
Expand All @@ -223,7 +234,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
.pods()
.withName(driverPod.getMetadata.getName)
.getLog
.contains("The Result is"), "The application did not complete.")
.contains(expectedLogOnCompletion), "The application did not complete.")
}
}

Expand Down Expand Up @@ -285,8 +296,6 @@ private[spark] object KubernetesSuite {
val CONTAINER_LOCAL_HELPER_JAR_PATH = s"local:///opt/spark/examples/" +
s"integration-tests-jars/${HELPER_JAR_FILE.getName}"

val TEST_EXISTENCE_FILE = Paths.get("test-data", "input.txt").toFile
val TEST_EXISTENCE_FILE_CONTENTS = Files.toString(TEST_EXISTENCE_FILE, Charsets.UTF_8)
val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes))
val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds))
val SPARK_PI_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" +
Expand All @@ -295,6 +304,7 @@ private[spark] object KubernetesSuite {
".integrationtest.jobs.FileExistenceTest"
val GROUP_BY_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" +
".integrationtest.jobs.GroupByTest"
val TEST_EXISTENCE_FILE_CONTENTS = "contents"

case object ShuffleNotReadyException extends Exception
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesCl
.set("spark.executor.memory", "500m")
.set("spark.executor.cores", "1")
.set("spark.executors.instances", "1")
.set("spark.app.name", "spark-pi")
.set("spark.app.name", "spark-test-app")
.set("spark.ui.enabled", "true")
.set("spark.testing", "false")
.set(WAIT_FOR_APP_COMPLETION, false)
Expand Down

0 comments on commit 56414f9

Please sign in to comment.