diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index edf5b9c3912a0..85b76013ba5f3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -346,8 +346,6 @@ object SparkSubmit extends CommandLineUtils {
(clusterManager, deployMode) match {
case (KUBERNETES, CLIENT) =>
printErrorAndExit("Client mode is currently not supported for Kubernetes.")
- case (KUBERNETES, CLUSTER) if args.isR =>
- printErrorAndExit("Kubernetes does not currently support R applications.")
case (STANDALONE, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
"applications on standalone clusters.")
@@ -642,6 +640,9 @@ object SparkSubmit extends CommandLineUtils {
if (args.pyFiles != null) {
childArgs ++= Array("--other-py-files", args.pyFiles)
}
+ } else if (args.isR) {
+ childArgs ++= Array("--primary-r-file", args.primaryResource)
+ childArgs ++= Array("--main-class", "org.apache.spark.deploy.RRunner")
} else {
childArgs ++= Array("--primary-java-resource", args.primaryResource)
childArgs ++= Array("--main-class", args.mainClass)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala
index 95d7f284f86da..1dad5393ca301 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala
@@ -69,6 +69,7 @@ package object constants {
private[spark] val ENV_MOUNTED_FILES_DIR = "SPARK_MOUNTED_FILES_DIR"
private[spark] val ENV_PYSPARK_FILES = "PYSPARK_FILES"
private[spark] val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY"
+ private[spark] val ENV_R_FILE = "R_FILE"
private[spark] val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_"
private[spark] val ENV_MOUNTED_FILES_FROM_SECRET_DIR = "SPARK_MOUNTED_FILES_FROM_SECRET_DIR"
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
index 9d921c4f3fe08..6ed497130429f 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
@@ -46,6 +46,8 @@ private[spark] object ClientArguments {
args.sliding(2, 2).toList.collect {
case Array("--primary-py-file", mainPyFile: String) =>
mainAppResource = Some(PythonMainAppResource(mainPyFile))
+ case Array("--primary-r-file", primaryRFile: String) =>
+ mainAppResource = Some(RMainAppResource(primaryRFile))
case Array("--primary-java-resource", primaryJavaResource: String) =>
mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
case Array("--main-class", clazz: String) =>
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala
index 92ec8e5d85260..70fd9b9a17707 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala
@@ -20,7 +20,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.ConfigurationUtils
import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.constants._
-import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep}
+import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep}
import org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator
import org.apache.spark.deploy.k8s.submit.submitsteps.LocalDirectoryMountConfigurationStep
import org.apache.spark.launcher.SparkLauncher
@@ -64,6 +64,11 @@ private[spark] class DriverConfigurationStepsOrchestrator(
Option(resource)
case _ => Option.empty
}
+ val additionalMainAppRFile = mainAppResource match {
+ case RMainAppResource(resource) if resource != SparkLauncher.NO_RESOURCE =>
+ Option(resource)
+ case _ => Option.empty
+ }
val sparkJars = submissionSparkConf.getOption("spark.jars")
.map(_.split(","))
.getOrElse(Array.empty[String]) ++
@@ -72,6 +77,7 @@ private[spark] class DriverConfigurationStepsOrchestrator(
.map(_.split(","))
.getOrElse(Array.empty[String]) ++
additionalMainAppPythonFile.toSeq ++
+ additionalMainAppRFile.toSeq ++
additionalPythonFiles
val driverCustomLabels = ConfigurationUtils.parsePrefixedKeyValuePairs(
submissionSparkConf,
@@ -108,9 +114,11 @@ private[spark] class DriverConfigurationStepsOrchestrator(
val localDirectoryMountConfigurationStep = new LocalDirectoryMountConfigurationStep(
submissionSparkConf)
- val pythonStep = mainAppResource match {
+ val resourceStep = mainAppResource match {
case PythonMainAppResource(mainPyResource) =>
Option(new PythonStep(mainPyResource, additionalPythonFiles, filesDownloadPath))
+ case RMainAppResource(mainRFile) =>
+ Option(new RStep(mainRFile, filesDownloadPath))
case _ => Option.empty[DriverConfigurationStep]
}
@@ -188,7 +196,7 @@ private[spark] class DriverConfigurationStepsOrchestrator(
dependencyResolutionStep,
localDirectoryMountConfigurationStep) ++
submittedDependenciesBootstrapSteps ++
- pythonStep.toSeq ++
+ resourceStep.toSeq ++
mountSecretsStep.toSeq
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala
index f806e65974fcc..5b70b3e38904e 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala
@@ -20,4 +20,6 @@ private[spark] sealed trait MainAppResource
private[spark] case class PythonMainAppResource(primaryPyFile: String) extends MainAppResource
+private[spark] case class RMainAppResource(primaryRFile: String) extends MainAppResource
+
private[spark] case class JavaMainAppResource(primaryResource: String) extends MainAppResource
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/RStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/RStep.scala
new file mode 100644
index 0000000000000..c4d1d63c2c4f4
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/RStep.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.submitsteps
+
+import io.fabric8.kubernetes.api.model.ContainerBuilder
+
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesFileUtils
+
+private[spark] class RStep(
+ mainRFile: String,
+ filesDownloadPath: String) extends DriverConfigurationStep {
+
+ override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
+ val withRFileContainer = new ContainerBuilder(driverSpec.driverContainer)
+ .addNewEnv()
+ .withName(ENV_R_FILE)
+ .withValue(KubernetesFileUtils.resolveFilePath(mainRFile, filesDownloadPath))
+ .endEnv()
+ driverSpec.copy(driverContainer = withRFileContainer.build())
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala
index 1199f033cf06a..cd80bc53b2211 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala
@@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.submit
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.config._
-import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, LocalDirectoryMountConfigurationStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep}
+import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, LocalDirectoryMountConfigurationStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep}
private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunSuite {
@@ -45,7 +45,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
APP_NAME,
MAIN_CLASS,
APP_ARGS,
- ADDITIONAL_PYTHON_FILES,
+ Seq.empty[String],
sparkConf)
validateStepTypes(
orchestrator,
@@ -69,7 +69,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
APP_NAME,
MAIN_CLASS,
APP_ARGS,
- ADDITIONAL_PYTHON_FILES,
+ Seq.empty[String],
sparkConf)
validateStepTypes(
orchestrator,
@@ -85,15 +85,15 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
val sparkConf = new SparkConf(false)
val mainAppResource = PythonMainAppResource("local:///var/apps/python/main.py")
val orchestrator = new DriverConfigurationStepsOrchestrator(
- NAMESPACE,
- APP_ID,
- LAUNCH_TIME,
- mainAppResource,
- APP_NAME,
- MAIN_CLASS,
- APP_ARGS,
- ADDITIONAL_PYTHON_FILES,
- sparkConf)
+ NAMESPACE,
+ APP_ID,
+ LAUNCH_TIME,
+ mainAppResource,
+ APP_NAME,
+ MAIN_CLASS,
+ APP_ARGS,
+ ADDITIONAL_PYTHON_FILES,
+ sparkConf)
validateStepTypes(
orchestrator,
classOf[BaseDriverConfigurationStep],
@@ -104,6 +104,30 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
classOf[PythonStep])
}
+ test("Submission steps with R file.") {
+ val sparkConf = new SparkConf(false)
+ val mainAppResource = RMainAppResource("local:///var/apps/r/main.R")
+ val orchestrator = new DriverConfigurationStepsOrchestrator(
+ NAMESPACE,
+ APP_ID,
+ LAUNCH_TIME,
+ mainAppResource,
+ APP_NAME,
+ MAIN_CLASS,
+ APP_ARGS,
+ Seq.empty[String],
+ sparkConf)
+ validateStepTypes(
+ orchestrator,
+ classOf[BaseDriverConfigurationStep],
+ classOf[DriverAddressConfigurationStep],
+ classOf[DriverKubernetesCredentialsStep],
+ classOf[DependencyResolutionStep],
+ classOf[LocalDirectoryMountConfigurationStep],
+ classOf[RStep])
+ }
+
+
test("Only local files without a resource staging server.") {
val sparkConf = new SparkConf(false).set("spark.files", "/var/spark/file1.txt")
val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar")
@@ -115,7 +139,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
APP_NAME,
MAIN_CLASS,
APP_ARGS,
- ADDITIONAL_PYTHON_FILES,
+ Seq.empty[String],
sparkConf)
validateStepTypes(
orchestrator,
@@ -140,7 +164,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
APP_NAME,
MAIN_CLASS,
APP_ARGS,
- ADDITIONAL_PYTHON_FILES,
+ Seq.empty[String],
sparkConf)
validateStepTypes(
orchestrator,
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/RStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/RStepSuite.scala
new file mode 100644
index 0000000000000..1c69edde10832
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/RStepSuite.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.submitsteps
+
+import io.fabric8.kubernetes.api.model._
+import org.scalatest.BeforeAndAfter
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+
+class RStepSuite extends SparkFunSuite with BeforeAndAfter {
+ private val FILE_DOWNLOAD_PATH = "/var/data/spark-files"
+ private val R_PRIMARY_FILE_OP1 = "local:///app/files/file1.R"
+ private val RESOLVED_R_PRIMARY_FILE_OP1 = "/app/files/file1.R"
+ private val R_PRIMARY_FILE_OP2 = "file:///app/files/file2.R"
+ private val RESOLVED_R_PRIMARY_FILE_OP2 = FILE_DOWNLOAD_PATH + "/file2.R"
+
+ test("testing RSpark with local file") {
+ val rStep = new RStep(
+ R_PRIMARY_FILE_OP1,
+ FILE_DOWNLOAD_PATH)
+ val returnedDriverContainer = rStep.configureDriver(
+ KubernetesDriverSpec(
+ new Pod(),
+ new Container(),
+ Seq.empty[HasMetadata],
+ new SparkConf))
+ assert(returnedDriverContainer.driverContainer.getEnv
+ .asScala.map(env => (env.getName, env.getValue)).toMap ===
+ Map(
+ "R_FILE" -> RESOLVED_R_PRIMARY_FILE_OP1))
+ }
+
+ test("testing RSpark with remote file") {
+ val rStep = new RStep(
+ R_PRIMARY_FILE_OP2,
+ FILE_DOWNLOAD_PATH)
+ val returnedDriverContainer = rStep.configureDriver(
+ KubernetesDriverSpec(
+ new Pod(),
+ new Container(),
+ Seq.empty[HasMetadata],
+ new SparkConf))
+ assert(returnedDriverContainer.driverContainer.getEnv
+ .asScala.map(env => (env.getName, env.getValue)).toMap ===
+ Map(
+ "R_FILE" -> RESOLVED_R_PRIMARY_FILE_OP2))
+ }
+
+}
diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-r/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-r/Dockerfile
new file mode 100644
index 0000000000000..26b1231ae2ec9
--- /dev/null
+++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-r/Dockerfile
@@ -0,0 +1,39 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM spark-base
+
+# If this docker file is being used in the context of building your images from a Spark distribution, the docker build
+# command should be invoked from the top level directory of the Spark distribution. E.g.:
+# docker build -t spark-driver-r:latest -f dockerfiles/driver-r/Dockerfile .
+
+ADD examples /opt/spark/examples
+ADD R /opt/spark/R
+
+RUN apk add --no-cache R R-dev
+
+ENV R_HOME /usr/lib/R
+
+CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
+ env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \
+ readarray -t SPARK_DRIVER_JAVA_OPTS < /tmp/java_opts.txt && \
+ if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+ if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+ if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+ if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
+ if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \
+ ${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp $SPARK_CLASSPATH -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY $SPARK_DRIVER_CLASS $R_FILE $SPARK_DRIVER_ARGS
diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-r/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-r/Dockerfile
new file mode 100644
index 0000000000000..fc6ca47df24dc
--- /dev/null
+++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-r/Dockerfile
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM spark-base
+
+# If this docker file is being used in the context of building your images from a Spark distribution, the docker build
+# command should be invoked from the top level directory of the Spark distribution. E.g.:
+# docker build -t spark-executor-r:latest -f dockerfiles/executor-r/Dockerfile .
+
+ADD examples /opt/spark/examples
+ADD R /opt/spark/R
+
+RUN apk add --no-cache R R-dev
+
+ENV R_HOME /usr/lib/R
+
+CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
+ env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \
+ readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \
+ if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+ if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+ if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
+ if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \
+ ${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP
diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml
index d95f2f68db58f..dda4743d607b2 100644
--- a/resource-managers/kubernetes/integration-tests/pom.xml
+++ b/resource-managers/kubernetes/integration-tests/pom.xml
@@ -239,6 +239,21 @@
+
+ copy-integration-r
+ pre-integration-test
+
+ copy-resources
+
+
+ ${project.build.directory}/docker/R
+
+
+ ${project.parent.basedir}/R
+
+
+
+
copy-integration-data
pre-integration-test
diff --git a/resource-managers/kubernetes/integration-tests/src/test/R/dataframe.R b/resource-managers/kubernetes/integration-tests/src/test/R/dataframe.R
new file mode 100644
index 0000000000000..311350497f873
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/R/dataframe.R
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# To run this example use
+# ./bin/spark-submit examples/src/main/r/dataframe.R
+
+library(SparkR)
+
+# Initialize SparkSession
+sparkR.session(appName = "SparkR-DataFrame-example")
+
+# Create a simple local data.frame
+localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18))
+
+# Convert local data frame to a SparkDataFrame
+df <- createDataFrame(localDF)
+
+# Print its schema
+printSchema(df)
+# root
+# |-- name: string (nullable = true)
+# |-- age: double (nullable = true)
+
+# Create a DataFrame from a JSON file
+path <- file.path(Sys.getenv("SPARK_HOME"), "examples/src/main/resources/people.json")
+peopleDF <- read.json(path)
+printSchema(peopleDF)
+# root
+# |-- age: long (nullable = true)
+# |-- name: string (nullable = true)
+
+# Register this DataFrame as a table.
+createOrReplaceTempView(peopleDF, "people")
+
+# SQL statements can be run by using the sql methods
+teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
+
+# Call collect to get a local data.frame
+teenagersLocalDF <- collect(teenagers)
+
+# Print the teenagers in our dataset
+print(teenagersLocalDF)
+
+# Stop the SparkSession now
+sparkR.session.stop()
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index 27041207ffdce..9d48d488bf967 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -34,7 +34,7 @@ import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackendFactory
import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.Minikube
import org.apache.spark.deploy.k8s.integrationtest.constants.MINIKUBE_TEST_BACKEND
-import org.apache.spark.deploy.k8s.submit.{Client, ClientArguments, JavaMainAppResource, KeyAndCertPem, MainAppResource, PythonMainAppResource}
+import org.apache.spark.deploy.k8s.submit.{Client, ClientArguments, JavaMainAppResource, KeyAndCertPem, MainAppResource, PythonMainAppResource, RMainAppResource}
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.Utils
@@ -101,6 +101,32 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
runPySparkPiAndVerifyCompletion(PYSPARK_PI_CONTAINER_LOCAL_FILE_LOCATION, Seq.empty[String])
}
+ test("Run SparkR Job on file locally") {
+ assume(testBackend.name == MINIKUBE_TEST_BACKEND)
+
+ launchStagingServer(SSLOptions(), None)
+ sparkConf
+ .set(DRIVER_DOCKER_IMAGE,
+ System.getProperty("spark.docker.test.driverImage", "spark-driver-r:latest"))
+ .set(EXECUTOR_DOCKER_IMAGE,
+ System.getProperty("spark.docker.test.executorImage", "spark-executor-r:latest"))
+
+ runSparkRAndVerifyCompletion(SPARK_R_DATAFRAME_CONTAINER_LOCAL_FILE_LOCATION)
+ }
+
+ test("Run SparkR Job on file from SUBMITTER") {
+ assume(testBackend.name == MINIKUBE_TEST_BACKEND)
+
+ sparkConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH))
+ sparkConf
+ .set(DRIVER_DOCKER_IMAGE,
+ System.getProperty("spark.docker.test.driverImage", "spark-driver-r:latest"))
+ .set(EXECUTOR_DOCKER_IMAGE,
+ System.getProperty("spark.docker.test.executorImage", "spark-executor-r:latest"))
+
+ runSparkRAndVerifyCompletion(SPARK_R_DATAFRAME_SUBMITTER_FILE_LOCATION)
+ }
+
test("Simple submission test with the resource staging server.") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
@@ -320,6 +346,16 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
otherPyFiles)
}
+ private def runSparkRAndVerifyCompletion(
+ appResource: String): Unit = {
+ runSparkApplicationAndVerifyCompletion(
+ RMainAppResource(appResource),
+ SPARK_R_MAIN_CLASS,
+ Seq("name: string (nullable = true)", "1 Justin"),
+ Array.empty[String],
+ Seq.empty[String])
+ }
+
private def runSparkApplicationAndVerifyCompletion(
appResource: MainAppResource,
mainClass: String,
@@ -425,10 +461,15 @@ private[spark] object KubernetesSuite {
val SPARK_PI_MAIN_CLASS = "org.apache.spark.deploy.k8s" +
".integrationtest.jobs.SparkPiWithInfiniteWait"
val PYSPARK_PI_MAIN_CLASS = "org.apache.spark.deploy.PythonRunner"
+ val SPARK_R_MAIN_CLASS = "org.apache.spark.deploy.RRunner"
val PYSPARK_PI_CONTAINER_LOCAL_FILE_LOCATION =
"local:///opt/spark/examples/src/main/python/pi.py"
val PYSPARK_SORT_CONTAINER_LOCAL_FILE_LOCATION =
"local:///opt/spark/examples/src/main/python/sort.py"
+ val SPARK_R_DATAFRAME_SUBMITTER_FILE_LOCATION =
+ "local:///opt/spark/examples/src/main/r/dataframe.R"
+ val SPARK_R_DATAFRAME_CONTAINER_LOCAL_FILE_LOCATION =
+ "src/test/R/dataframe.R"
val PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION = "src/test/python/pi.py"
val FILE_EXISTENCE_MAIN_CLASS = "org.apache.spark.deploy.k8s" +
".integrationtest.jobs.FileExistenceTest"
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala
index 0e2fced70c9f7..cf2766d81859f 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala
@@ -40,8 +40,10 @@ private[spark] class SparkDockerImageBuilder
private val BASE_DOCKER_FILE = "dockerfiles/spark-base/Dockerfile"
private val DRIVER_DOCKER_FILE = "dockerfiles/driver/Dockerfile"
private val DRIVERPY_DOCKER_FILE = "dockerfiles/driver-py/Dockerfile"
+ private val DRIVERR_DOCKER_FILE = "dockerfiles/driver-r/Dockerfile"
private val EXECUTOR_DOCKER_FILE = "dockerfiles/executor/Dockerfile"
private val EXECUTORPY_DOCKER_FILE = "dockerfiles/executor-py/Dockerfile"
+ private val EXECUTORR_DOCKER_FILE = "dockerfiles/executor-r/Dockerfile"
private val SHUFFLE_SERVICE_DOCKER_FILE = "dockerfiles/shuffle-service/Dockerfile"
private val INIT_CONTAINER_DOCKER_FILE = "dockerfiles/init-container/Dockerfile"
private val STAGING_SERVER_DOCKER_FILE = "dockerfiles/resource-staging-server/Dockerfile"
@@ -76,21 +78,34 @@ private[spark] class SparkDockerImageBuilder
val pythonExec = sys.env.get("PYSPARK_DRIVER_PYTHON")
.orElse(sys.env.get("PYSPARK_PYTHON"))
.getOrElse("/usr/bin/python")
- val builder = new ProcessBuilder(
+ val python_builder = new ProcessBuilder(
Seq(pythonExec, "setup.py", "sdist").asJava)
- builder.directory(new File(DOCKER_BUILD_PATH.toFile, "python"))
- builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
- val process = builder.start()
+ python_builder.directory(new File(DOCKER_BUILD_PATH.toFile, "python"))
+ python_builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
+ val process = python_builder.start()
new RedirectThread(process.getInputStream, System.out, "redirect output").start()
- val exitCode = process.waitFor()
- if (exitCode != 0) {
- logInfo(s"exitCode: $exitCode")
+ val exitCodePython = process.waitFor()
+ if (exitCodePython != 0) {
+ logInfo(s"exitCode: $exitCodePython")
+ }
+ // Building R distribution environment
+ val r_builder = new ProcessBuilder(
+ Seq("bash", "install-dev.sh").asJava)
+ r_builder.directory(new File(DOCKER_BUILD_PATH.toFile, "R"))
+ r_builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
+ val r_process = r_builder.start()
+ new RedirectThread(r_process.getInputStream, System.out, "redirect output").start()
+ val exitCodeR = r_process.waitFor()
+ if (exitCodeR != 0) {
+ logInfo(s"exitCode: $exitCodeR")
}
buildImage("spark-base", BASE_DOCKER_FILE)
buildImage("spark-driver", DRIVER_DOCKER_FILE)
buildImage("spark-driver-py", DRIVERPY_DOCKER_FILE)
+ buildImage("spark-driver-r", DRIVERR_DOCKER_FILE)
buildImage("spark-executor", EXECUTOR_DOCKER_FILE)
buildImage("spark-executor-py", EXECUTORPY_DOCKER_FILE)
+ buildImage("spark-executor-r", EXECUTORR_DOCKER_FILE)
buildImage("spark-shuffle", SHUFFLE_SERVICE_DOCKER_FILE)
buildImage("spark-resource-staging-server", STAGING_SERVER_DOCKER_FILE)
buildImage("spark-init", INIT_CONTAINER_DOCKER_FILE)
diff --git a/sbin/build-push-docker-images.sh b/sbin/build-push-docker-images.sh
index efd6f96516d90..ad7a6079df7dc 100755
--- a/sbin/build-push-docker-images.sh
+++ b/sbin/build-push-docker-images.sh
@@ -23,6 +23,8 @@ declare -A path=( [spark-driver]=dockerfiles/driver/Dockerfile \
[spark-executor]=dockerfiles/executor/Dockerfile \
[spark-driver-py]=dockerfiles/driver-py/Dockerfile \
[spark-executor-py]=dockerfiles/executor-py/Dockerfile \
+ [spark-driver-r]=dockerfiles/driver-r/Dockerfile \
+ [spark-executor-r]=dockerfiles/executor-r/Dockerfile \
[spark-init]=dockerfiles/init-container/Dockerfile \
[spark-shuffle]=dockerfiles/shuffle-service/Dockerfile \
[spark-resource-staging-server]=dockerfiles/resource-staging-server/Dockerfile )