diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index edf5b9c3912a0..85b76013ba5f3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -346,8 +346,6 @@ object SparkSubmit extends CommandLineUtils { (clusterManager, deployMode) match { case (KUBERNETES, CLIENT) => printErrorAndExit("Client mode is currently not supported for Kubernetes.") - case (KUBERNETES, CLUSTER) if args.isR => - printErrorAndExit("Kubernetes does not currently support R applications.") case (STANDALONE, CLUSTER) if args.isPython => printErrorAndExit("Cluster deploy mode is currently not supported for python " + "applications on standalone clusters.") @@ -642,6 +640,9 @@ object SparkSubmit extends CommandLineUtils { if (args.pyFiles != null) { childArgs ++= Array("--other-py-files", args.pyFiles) } + } else if (args.isR) { + childArgs ++= Array("--primary-r-file", args.primaryResource) + childArgs ++= Array("--main-class", "org.apache.spark.deploy.RRunner") } else { childArgs ++= Array("--primary-java-resource", args.primaryResource) childArgs ++= Array("--main-class", args.mainClass) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala index 95d7f284f86da..1dad5393ca301 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala @@ -69,6 +69,7 @@ package object constants { private[spark] val ENV_MOUNTED_FILES_DIR = "SPARK_MOUNTED_FILES_DIR" private[spark] val ENV_PYSPARK_FILES = "PYSPARK_FILES" private[spark] val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY" + private[spark] val ENV_R_FILE = "R_FILE" private[spark] val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_" private[spark] val ENV_MOUNTED_FILES_FROM_SECRET_DIR = "SPARK_MOUNTED_FILES_FROM_SECRET_DIR" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala index 9d921c4f3fe08..6ed497130429f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala @@ -46,6 +46,8 @@ private[spark] object ClientArguments { args.sliding(2, 2).toList.collect { case Array("--primary-py-file", mainPyFile: String) => mainAppResource = Some(PythonMainAppResource(mainPyFile)) + case Array("--primary-r-file", primaryRFile: String) => + mainAppResource = Some(RMainAppResource(primaryRFile)) case Array("--primary-java-resource", primaryJavaResource: String) => mainAppResource = Some(JavaMainAppResource(primaryJavaResource)) case Array("--main-class", clazz: String) => diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala index 92ec8e5d85260..70fd9b9a17707 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala @@ -20,7 +20,7 @@ import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.ConfigurationUtils import org.apache.spark.deploy.k8s.config._ import org.apache.spark.deploy.k8s.constants._ -import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep} +import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep} import org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator import org.apache.spark.deploy.k8s.submit.submitsteps.LocalDirectoryMountConfigurationStep import org.apache.spark.launcher.SparkLauncher @@ -64,6 +64,11 @@ private[spark] class DriverConfigurationStepsOrchestrator( Option(resource) case _ => Option.empty } + val additionalMainAppRFile = mainAppResource match { + case RMainAppResource(resource) if resource != SparkLauncher.NO_RESOURCE => + Option(resource) + case _ => Option.empty + } val sparkJars = submissionSparkConf.getOption("spark.jars") .map(_.split(",")) .getOrElse(Array.empty[String]) ++ @@ -72,6 +77,7 @@ private[spark] class DriverConfigurationStepsOrchestrator( .map(_.split(",")) .getOrElse(Array.empty[String]) ++ additionalMainAppPythonFile.toSeq ++ + additionalMainAppRFile.toSeq ++ additionalPythonFiles val driverCustomLabels = ConfigurationUtils.parsePrefixedKeyValuePairs( submissionSparkConf, @@ -108,9 +114,11 @@ private[spark] class DriverConfigurationStepsOrchestrator( val localDirectoryMountConfigurationStep = new LocalDirectoryMountConfigurationStep( submissionSparkConf) - val pythonStep = mainAppResource match { + val resourceStep = mainAppResource match { case PythonMainAppResource(mainPyResource) => Option(new PythonStep(mainPyResource, additionalPythonFiles, filesDownloadPath)) + case RMainAppResource(mainRFile) => + Option(new RStep(mainRFile, filesDownloadPath)) case _ => Option.empty[DriverConfigurationStep] } @@ -188,7 +196,7 @@ private[spark] class DriverConfigurationStepsOrchestrator( dependencyResolutionStep, localDirectoryMountConfigurationStep) ++ submittedDependenciesBootstrapSteps ++ - pythonStep.toSeq ++ + resourceStep.toSeq ++ mountSecretsStep.toSeq } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala index f806e65974fcc..5b70b3e38904e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala @@ -20,4 +20,6 @@ private[spark] sealed trait MainAppResource private[spark] case class PythonMainAppResource(primaryPyFile: String) extends MainAppResource +private[spark] case class RMainAppResource(primaryRFile: String) extends MainAppResource + private[spark] case class JavaMainAppResource(primaryResource: String) extends MainAppResource diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/RStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/RStep.scala new file mode 100644 index 0000000000000..c4d1d63c2c4f4 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/RStep.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.submitsteps + +import io.fabric8.kubernetes.api.model.ContainerBuilder + +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesFileUtils + +private[spark] class RStep( + mainRFile: String, + filesDownloadPath: String) extends DriverConfigurationStep { + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { + val withRFileContainer = new ContainerBuilder(driverSpec.driverContainer) + .addNewEnv() + .withName(ENV_R_FILE) + .withValue(KubernetesFileUtils.resolveFilePath(mainRFile, filesDownloadPath)) + .endEnv() + driverSpec.copy(driverContainer = withRFileContainer.build()) + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala index 1199f033cf06a..cd80bc53b2211 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.submit import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.config._ -import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, LocalDirectoryMountConfigurationStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep} +import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, LocalDirectoryMountConfigurationStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep} private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunSuite { @@ -45,7 +45,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS APP_NAME, MAIN_CLASS, APP_ARGS, - ADDITIONAL_PYTHON_FILES, + Seq.empty[String], sparkConf) validateStepTypes( orchestrator, @@ -69,7 +69,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS APP_NAME, MAIN_CLASS, APP_ARGS, - ADDITIONAL_PYTHON_FILES, + Seq.empty[String], sparkConf) validateStepTypes( orchestrator, @@ -85,15 +85,15 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS val sparkConf = new SparkConf(false) val mainAppResource = PythonMainAppResource("local:///var/apps/python/main.py") val orchestrator = new DriverConfigurationStepsOrchestrator( - NAMESPACE, - APP_ID, - LAUNCH_TIME, - mainAppResource, - APP_NAME, - MAIN_CLASS, - APP_ARGS, - ADDITIONAL_PYTHON_FILES, - sparkConf) + NAMESPACE, + APP_ID, + LAUNCH_TIME, + mainAppResource, + APP_NAME, + MAIN_CLASS, + APP_ARGS, + ADDITIONAL_PYTHON_FILES, + sparkConf) validateStepTypes( orchestrator, classOf[BaseDriverConfigurationStep], @@ -104,6 +104,30 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS classOf[PythonStep]) } + test("Submission steps with R file.") { + val sparkConf = new SparkConf(false) + val mainAppResource = RMainAppResource("local:///var/apps/r/main.R") + val orchestrator = new DriverConfigurationStepsOrchestrator( + NAMESPACE, + APP_ID, + LAUNCH_TIME, + mainAppResource, + APP_NAME, + MAIN_CLASS, + APP_ARGS, + Seq.empty[String], + sparkConf) + validateStepTypes( + orchestrator, + classOf[BaseDriverConfigurationStep], + classOf[DriverAddressConfigurationStep], + classOf[DriverKubernetesCredentialsStep], + classOf[DependencyResolutionStep], + classOf[LocalDirectoryMountConfigurationStep], + classOf[RStep]) + } + + test("Only local files without a resource staging server.") { val sparkConf = new SparkConf(false).set("spark.files", "/var/spark/file1.txt") val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar") @@ -115,7 +139,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS APP_NAME, MAIN_CLASS, APP_ARGS, - ADDITIONAL_PYTHON_FILES, + Seq.empty[String], sparkConf) validateStepTypes( orchestrator, @@ -140,7 +164,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS APP_NAME, MAIN_CLASS, APP_ARGS, - ADDITIONAL_PYTHON_FILES, + Seq.empty[String], sparkConf) validateStepTypes( orchestrator, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/RStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/RStepSuite.scala new file mode 100644 index 0000000000000..1c69edde10832 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/RStepSuite.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.submitsteps + +import io.fabric8.kubernetes.api.model._ +import org.scalatest.BeforeAndAfter +import scala.collection.JavaConverters._ + +import org.apache.spark.{SparkConf, SparkFunSuite} + +class RStepSuite extends SparkFunSuite with BeforeAndAfter { + private val FILE_DOWNLOAD_PATH = "/var/data/spark-files" + private val R_PRIMARY_FILE_OP1 = "local:///app/files/file1.R" + private val RESOLVED_R_PRIMARY_FILE_OP1 = "/app/files/file1.R" + private val R_PRIMARY_FILE_OP2 = "file:///app/files/file2.R" + private val RESOLVED_R_PRIMARY_FILE_OP2 = FILE_DOWNLOAD_PATH + "/file2.R" + + test("testing RSpark with local file") { + val rStep = new RStep( + R_PRIMARY_FILE_OP1, + FILE_DOWNLOAD_PATH) + val returnedDriverContainer = rStep.configureDriver( + KubernetesDriverSpec( + new Pod(), + new Container(), + Seq.empty[HasMetadata], + new SparkConf)) + assert(returnedDriverContainer.driverContainer.getEnv + .asScala.map(env => (env.getName, env.getValue)).toMap === + Map( + "R_FILE" -> RESOLVED_R_PRIMARY_FILE_OP1)) + } + + test("testing RSpark with remote file") { + val rStep = new RStep( + R_PRIMARY_FILE_OP2, + FILE_DOWNLOAD_PATH) + val returnedDriverContainer = rStep.configureDriver( + KubernetesDriverSpec( + new Pod(), + new Container(), + Seq.empty[HasMetadata], + new SparkConf)) + assert(returnedDriverContainer.driverContainer.getEnv + .asScala.map(env => (env.getName, env.getValue)).toMap === + Map( + "R_FILE" -> RESOLVED_R_PRIMARY_FILE_OP2)) + } + +} diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-r/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-r/Dockerfile new file mode 100644 index 0000000000000..26b1231ae2ec9 --- /dev/null +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-r/Dockerfile @@ -0,0 +1,39 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM spark-base + +# If this docker file is being used in the context of building your images from a Spark distribution, the docker build +# command should be invoked from the top level directory of the Spark distribution. E.g.: +# docker build -t spark-driver-r:latest -f dockerfiles/driver-r/Dockerfile . + +ADD examples /opt/spark/examples +ADD R /opt/spark/R + +RUN apk add --no-cache R R-dev + +ENV R_HOME /usr/lib/R + +CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ + env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \ + readarray -t SPARK_DRIVER_JAVA_OPTS < /tmp/java_opts.txt && \ + if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \ + if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ + if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ + if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ + if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \ + ${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp $SPARK_CLASSPATH -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY $SPARK_DRIVER_CLASS $R_FILE $SPARK_DRIVER_ARGS diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-r/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-r/Dockerfile new file mode 100644 index 0000000000000..fc6ca47df24dc --- /dev/null +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-r/Dockerfile @@ -0,0 +1,38 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM spark-base + +# If this docker file is being used in the context of building your images from a Spark distribution, the docker build +# command should be invoked from the top level directory of the Spark distribution. E.g.: +# docker build -t spark-executor-r:latest -f dockerfiles/executor-r/Dockerfile . + +ADD examples /opt/spark/examples +ADD R /opt/spark/R + +RUN apk add --no-cache R R-dev + +ENV R_HOME /usr/lib/R + +CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ + env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \ + readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \ + if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \ + if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ + if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ + if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \ + ${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml index d95f2f68db58f..dda4743d607b2 100644 --- a/resource-managers/kubernetes/integration-tests/pom.xml +++ b/resource-managers/kubernetes/integration-tests/pom.xml @@ -239,6 +239,21 @@ + + copy-integration-r + pre-integration-test + + copy-resources + + + ${project.build.directory}/docker/R + + + ${project.parent.basedir}/R + + + + copy-integration-data pre-integration-test diff --git a/resource-managers/kubernetes/integration-tests/src/test/R/dataframe.R b/resource-managers/kubernetes/integration-tests/src/test/R/dataframe.R new file mode 100644 index 0000000000000..311350497f873 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/R/dataframe.R @@ -0,0 +1,59 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# To run this example use +# ./bin/spark-submit examples/src/main/r/dataframe.R + +library(SparkR) + +# Initialize SparkSession +sparkR.session(appName = "SparkR-DataFrame-example") + +# Create a simple local data.frame +localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18)) + +# Convert local data frame to a SparkDataFrame +df <- createDataFrame(localDF) + +# Print its schema +printSchema(df) +# root +# |-- name: string (nullable = true) +# |-- age: double (nullable = true) + +# Create a DataFrame from a JSON file +path <- file.path(Sys.getenv("SPARK_HOME"), "examples/src/main/resources/people.json") +peopleDF <- read.json(path) +printSchema(peopleDF) +# root +# |-- age: long (nullable = true) +# |-- name: string (nullable = true) + +# Register this DataFrame as a table. +createOrReplaceTempView(peopleDF, "people") + +# SQL statements can be run by using the sql methods +teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") + +# Call collect to get a local data.frame +teenagersLocalDF <- collect(teenagers) + +# Print the teenagers in our dataset +print(teenagersLocalDF) + +# Stop the SparkSession now +sparkR.session.stop() diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 27041207ffdce..9d48d488bf967 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.deploy.k8s.config._ import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackendFactory import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.Minikube import org.apache.spark.deploy.k8s.integrationtest.constants.MINIKUBE_TEST_BACKEND -import org.apache.spark.deploy.k8s.submit.{Client, ClientArguments, JavaMainAppResource, KeyAndCertPem, MainAppResource, PythonMainAppResource} +import org.apache.spark.deploy.k8s.submit.{Client, ClientArguments, JavaMainAppResource, KeyAndCertPem, MainAppResource, PythonMainAppResource, RMainAppResource} import org.apache.spark.launcher.SparkLauncher import org.apache.spark.util.Utils @@ -101,6 +101,32 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { runPySparkPiAndVerifyCompletion(PYSPARK_PI_CONTAINER_LOCAL_FILE_LOCATION, Seq.empty[String]) } + test("Run SparkR Job on file locally") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + + launchStagingServer(SSLOptions(), None) + sparkConf + .set(DRIVER_DOCKER_IMAGE, + System.getProperty("spark.docker.test.driverImage", "spark-driver-r:latest")) + .set(EXECUTOR_DOCKER_IMAGE, + System.getProperty("spark.docker.test.executorImage", "spark-executor-r:latest")) + + runSparkRAndVerifyCompletion(SPARK_R_DATAFRAME_CONTAINER_LOCAL_FILE_LOCATION) + } + + test("Run SparkR Job on file from SUBMITTER") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + + sparkConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH)) + sparkConf + .set(DRIVER_DOCKER_IMAGE, + System.getProperty("spark.docker.test.driverImage", "spark-driver-r:latest")) + .set(EXECUTOR_DOCKER_IMAGE, + System.getProperty("spark.docker.test.executorImage", "spark-executor-r:latest")) + + runSparkRAndVerifyCompletion(SPARK_R_DATAFRAME_SUBMITTER_FILE_LOCATION) + } + test("Simple submission test with the resource staging server.") { assume(testBackend.name == MINIKUBE_TEST_BACKEND) @@ -320,6 +346,16 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { otherPyFiles) } + private def runSparkRAndVerifyCompletion( + appResource: String): Unit = { + runSparkApplicationAndVerifyCompletion( + RMainAppResource(appResource), + SPARK_R_MAIN_CLASS, + Seq("name: string (nullable = true)", "1 Justin"), + Array.empty[String], + Seq.empty[String]) + } + private def runSparkApplicationAndVerifyCompletion( appResource: MainAppResource, mainClass: String, @@ -425,10 +461,15 @@ private[spark] object KubernetesSuite { val SPARK_PI_MAIN_CLASS = "org.apache.spark.deploy.k8s" + ".integrationtest.jobs.SparkPiWithInfiniteWait" val PYSPARK_PI_MAIN_CLASS = "org.apache.spark.deploy.PythonRunner" + val SPARK_R_MAIN_CLASS = "org.apache.spark.deploy.RRunner" val PYSPARK_PI_CONTAINER_LOCAL_FILE_LOCATION = "local:///opt/spark/examples/src/main/python/pi.py" val PYSPARK_SORT_CONTAINER_LOCAL_FILE_LOCATION = "local:///opt/spark/examples/src/main/python/sort.py" + val SPARK_R_DATAFRAME_SUBMITTER_FILE_LOCATION = + "local:///opt/spark/examples/src/main/r/dataframe.R" + val SPARK_R_DATAFRAME_CONTAINER_LOCAL_FILE_LOCATION = + "src/test/R/dataframe.R" val PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION = "src/test/python/pi.py" val FILE_EXISTENCE_MAIN_CLASS = "org.apache.spark.deploy.k8s" + ".integrationtest.jobs.FileExistenceTest" diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala index 0e2fced70c9f7..cf2766d81859f 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala @@ -40,8 +40,10 @@ private[spark] class SparkDockerImageBuilder private val BASE_DOCKER_FILE = "dockerfiles/spark-base/Dockerfile" private val DRIVER_DOCKER_FILE = "dockerfiles/driver/Dockerfile" private val DRIVERPY_DOCKER_FILE = "dockerfiles/driver-py/Dockerfile" + private val DRIVERR_DOCKER_FILE = "dockerfiles/driver-r/Dockerfile" private val EXECUTOR_DOCKER_FILE = "dockerfiles/executor/Dockerfile" private val EXECUTORPY_DOCKER_FILE = "dockerfiles/executor-py/Dockerfile" + private val EXECUTORR_DOCKER_FILE = "dockerfiles/executor-r/Dockerfile" private val SHUFFLE_SERVICE_DOCKER_FILE = "dockerfiles/shuffle-service/Dockerfile" private val INIT_CONTAINER_DOCKER_FILE = "dockerfiles/init-container/Dockerfile" private val STAGING_SERVER_DOCKER_FILE = "dockerfiles/resource-staging-server/Dockerfile" @@ -76,21 +78,34 @@ private[spark] class SparkDockerImageBuilder val pythonExec = sys.env.get("PYSPARK_DRIVER_PYTHON") .orElse(sys.env.get("PYSPARK_PYTHON")) .getOrElse("/usr/bin/python") - val builder = new ProcessBuilder( + val python_builder = new ProcessBuilder( Seq(pythonExec, "setup.py", "sdist").asJava) - builder.directory(new File(DOCKER_BUILD_PATH.toFile, "python")) - builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize - val process = builder.start() + python_builder.directory(new File(DOCKER_BUILD_PATH.toFile, "python")) + python_builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize + val process = python_builder.start() new RedirectThread(process.getInputStream, System.out, "redirect output").start() - val exitCode = process.waitFor() - if (exitCode != 0) { - logInfo(s"exitCode: $exitCode") + val exitCodePython = process.waitFor() + if (exitCodePython != 0) { + logInfo(s"exitCode: $exitCodePython") + } + // Building R distribution environment + val r_builder = new ProcessBuilder( + Seq("bash", "install-dev.sh").asJava) + r_builder.directory(new File(DOCKER_BUILD_PATH.toFile, "R")) + r_builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize + val r_process = r_builder.start() + new RedirectThread(r_process.getInputStream, System.out, "redirect output").start() + val exitCodeR = r_process.waitFor() + if (exitCodeR != 0) { + logInfo(s"exitCode: $exitCodeR") } buildImage("spark-base", BASE_DOCKER_FILE) buildImage("spark-driver", DRIVER_DOCKER_FILE) buildImage("spark-driver-py", DRIVERPY_DOCKER_FILE) + buildImage("spark-driver-r", DRIVERR_DOCKER_FILE) buildImage("spark-executor", EXECUTOR_DOCKER_FILE) buildImage("spark-executor-py", EXECUTORPY_DOCKER_FILE) + buildImage("spark-executor-r", EXECUTORR_DOCKER_FILE) buildImage("spark-shuffle", SHUFFLE_SERVICE_DOCKER_FILE) buildImage("spark-resource-staging-server", STAGING_SERVER_DOCKER_FILE) buildImage("spark-init", INIT_CONTAINER_DOCKER_FILE) diff --git a/sbin/build-push-docker-images.sh b/sbin/build-push-docker-images.sh index efd6f96516d90..ad7a6079df7dc 100755 --- a/sbin/build-push-docker-images.sh +++ b/sbin/build-push-docker-images.sh @@ -23,6 +23,8 @@ declare -A path=( [spark-driver]=dockerfiles/driver/Dockerfile \ [spark-executor]=dockerfiles/executor/Dockerfile \ [spark-driver-py]=dockerfiles/driver-py/Dockerfile \ [spark-executor-py]=dockerfiles/executor-py/Dockerfile \ + [spark-driver-r]=dockerfiles/driver-r/Dockerfile \ + [spark-executor-r]=dockerfiles/executor-r/Dockerfile \ [spark-init]=dockerfiles/init-container/Dockerfile \ [spark-shuffle]=dockerfiles/shuffle-service/Dockerfile \ [spark-resource-staging-server]=dockerfiles/resource-staging-server/Dockerfile )