Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

SparkR Support #507

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,6 @@ object SparkSubmit extends CommandLineUtils {
(clusterManager, deployMode) match {
case (KUBERNETES, CLIENT) =>
printErrorAndExit("Client mode is currently not supported for Kubernetes.")
case (KUBERNETES, CLUSTER) if args.isR =>
printErrorAndExit("Kubernetes does not currently support R applications.")
case (STANDALONE, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
"applications on standalone clusters.")
Expand Down Expand Up @@ -642,6 +640,9 @@ object SparkSubmit extends CommandLineUtils {
if (args.pyFiles != null) {
childArgs ++= Array("--other-py-files", args.pyFiles)
}
} else if (args.isR) {
childArgs ++= Array("--primary-r-file", args.primaryResource)
childArgs ++= Array("--main-class", "org.apache.spark.deploy.RRunner")
} else {
childArgs ++= Array("--primary-java-resource", args.primaryResource)
childArgs ++= Array("--main-class", args.mainClass)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ package object constants {
private[spark] val ENV_MOUNTED_FILES_DIR = "SPARK_MOUNTED_FILES_DIR"
private[spark] val ENV_PYSPARK_FILES = "PYSPARK_FILES"
private[spark] val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY"
private[spark] val ENV_R_FILE = "R_FILE"
private[spark] val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_"
private[spark] val ENV_MOUNTED_FILES_FROM_SECRET_DIR = "SPARK_MOUNTED_FILES_FROM_SECRET_DIR"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ private[spark] object ClientArguments {
args.sliding(2, 2).toList.collect {
case Array("--primary-py-file", mainPyFile: String) =>
mainAppResource = Some(PythonMainAppResource(mainPyFile))
case Array("--primary-r-file", primaryRFile: String) =>
mainAppResource = Some(RMainAppResource(primaryRFile))
case Array("--primary-java-resource", primaryJavaResource: String) =>
mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
case Array("--main-class", clazz: String) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.ConfigurationUtils
import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep}
import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep}
import org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.{SystemClock, Utils}
Expand Down Expand Up @@ -63,6 +63,11 @@ private[spark] class DriverConfigurationStepsOrchestrator(
Option(resource)
case _ => Option.empty
}
val additionalMainAppRFile = mainAppResource match {
case RMainAppResource(resource) if resource != SparkLauncher.NO_RESOURCE =>
Option(resource)
case _ => Option.empty
}
val sparkJars = submissionSparkConf.getOption("spark.jars")
.map(_.split(","))
.getOrElse(Array.empty[String]) ++
Expand All @@ -71,6 +76,7 @@ private[spark] class DriverConfigurationStepsOrchestrator(
.map(_.split(","))
.getOrElse(Array.empty[String]) ++
additionalMainAppPythonFile.toSeq ++
additionalMainAppRFile.toSeq ++
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important here that, similar to Python Primary Resource, that the R File is distributed via --files.

additionalPythonFiles
val driverCustomLabels = ConfigurationUtils.parsePrefixedKeyValuePairs(
submissionSparkConf,
Expand Down Expand Up @@ -104,9 +110,11 @@ private[spark] class DriverConfigurationStepsOrchestrator(
val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
submissionSparkConf, kubernetesResourceNamePrefix)

val pythonStep = mainAppResource match {
val resourceStep = mainAppResource match {
case PythonMainAppResource(mainPyResource) =>
Option(new PythonStep(mainPyResource, additionalPythonFiles, filesDownloadPath))
case RMainAppResource(mainRFile) =>
Option(new RStep(mainRFile, filesDownloadPath))
case _ => Option.empty[DriverConfigurationStep]
}

Expand Down Expand Up @@ -183,7 +191,7 @@ private[spark] class DriverConfigurationStepsOrchestrator(
kubernetesCredentialsStep,
dependencyResolutionStep) ++
submittedDependenciesBootstrapSteps ++
pythonStep.toSeq ++
resourceStep.toSeq ++
mountSecretsStep.toSeq
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ private[spark] sealed trait MainAppResource

private[spark] case class PythonMainAppResource(primaryPyFile: String) extends MainAppResource

private[spark] case class RMainAppResource(primaryRFile: String) extends MainAppResource

private[spark] case class JavaMainAppResource(primaryResource: String) extends MainAppResource
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit.submitsteps

import io.fabric8.kubernetes.api.model.ContainerBuilder

import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.deploy.k8s.submit.KubernetesFileUtils

private[spark] class RStep(
mainRFile: String,
filesDownloadPath: String) extends DriverConfigurationStep {

override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
val withRFileContainer = new ContainerBuilder(driverSpec.driverContainer)
.addNewEnv()
.withName(ENV_R_FILE)
.withValue(KubernetesFileUtils.resolveFilePath(mainRFile, filesDownloadPath))
.endEnv()
driverSpec.copy(driverContainer = withRFileContainer.build())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.submit

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep}
import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep}

private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunSuite {

Expand All @@ -45,7 +45,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
APP_NAME,
MAIN_CLASS,
APP_ARGS,
ADDITIONAL_PYTHON_FILES,
Seq.empty[String],
sparkConf)
validateStepTypes(
orchestrator,
Expand All @@ -68,7 +68,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
APP_NAME,
MAIN_CLASS,
APP_ARGS,
ADDITIONAL_PYTHON_FILES,
Seq.empty[String],
sparkConf)
validateStepTypes(
orchestrator,
Expand All @@ -83,24 +83,47 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
val sparkConf = new SparkConf(false)
val mainAppResource = PythonMainAppResource("local:///var/apps/python/main.py")
val orchestrator = new DriverConfigurationStepsOrchestrator(
NAMESPACE,
APP_ID,
LAUNCH_TIME,
mainAppResource,
APP_NAME,
MAIN_CLASS,
APP_ARGS,
ADDITIONAL_PYTHON_FILES,
sparkConf)
NAMESPACE,
APP_ID,
LAUNCH_TIME,
mainAppResource,
APP_NAME,
MAIN_CLASS,
APP_ARGS,
ADDITIONAL_PYTHON_FILES,
sparkConf)
validateStepTypes(
orchestrator,
classOf[BaseDriverConfigurationStep],
classOf[DriverAddressConfigurationStep],
classOf[DriverKubernetesCredentialsStep],
classOf[DependencyResolutionStep],
classOf[PythonStep])
orchestrator,
classOf[BaseDriverConfigurationStep],
classOf[DriverAddressConfigurationStep],
classOf[DriverKubernetesCredentialsStep],
classOf[DependencyResolutionStep],
classOf[PythonStep])
}

test("Submission steps with R file.") {
val sparkConf = new SparkConf(false)
val mainAppResource = RMainAppResource("local:///var/apps/r/main.R")
val orchestrator = new DriverConfigurationStepsOrchestrator(
NAMESPACE,
APP_ID,
LAUNCH_TIME,
mainAppResource,
APP_NAME,
MAIN_CLASS,
APP_ARGS,
Seq.empty[String],
sparkConf)
validateStepTypes(
orchestrator,
classOf[BaseDriverConfigurationStep],
classOf[DriverAddressConfigurationStep],
classOf[DriverKubernetesCredentialsStep],
classOf[DependencyResolutionStep],
classOf[RStep])
}


test("Only local files without a resource staging server.") {
val sparkConf = new SparkConf(false).set("spark.files", "/var/spark/file1.txt")
val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar")
Expand All @@ -112,7 +135,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
APP_NAME,
MAIN_CLASS,
APP_ARGS,
ADDITIONAL_PYTHON_FILES,
Seq.empty[String],
sparkConf)
validateStepTypes(
orchestrator,
Expand All @@ -136,7 +159,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
APP_NAME,
MAIN_CLASS,
APP_ARGS,
ADDITIONAL_PYTHON_FILES,
Seq.empty[String],
sparkConf)
validateStepTypes(
orchestrator,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit.submitsteps

import io.fabric8.kubernetes.api.model._
import org.scalatest.BeforeAndAfter
import scala.collection.JavaConverters._

import org.apache.spark.{SparkConf, SparkFunSuite}

class RStepSuite extends SparkFunSuite with BeforeAndAfter {
private val FILE_DOWNLOAD_PATH = "/var/data/spark-files"
private val R_PRIMARY_FILE_OP1 = "local:///app/files/file1.R"
private val RESOLVED_R_PRIMARY_FILE_OP1 = "/app/files/file1.R"
private val R_PRIMARY_FILE_OP2 = "file:///app/files/file2.R"
private val RESOLVED_R_PRIMARY_FILE_OP2 = FILE_DOWNLOAD_PATH + "/file2.R"

test("testing RSpark with local file") {
val rStep = new RStep(
R_PRIMARY_FILE_OP1,
FILE_DOWNLOAD_PATH)
val returnedDriverContainer = rStep.configureDriver(
KubernetesDriverSpec(
new Pod(),
new Container(),
Seq.empty[HasMetadata],
new SparkConf))
assert(returnedDriverContainer.driverContainer.getEnv
.asScala.map(env => (env.getName, env.getValue)).toMap ===
Map(
"R_FILE" -> RESOLVED_R_PRIMARY_FILE_OP1))
}

test("testing RSpark with remote file") {
val rStep = new RStep(
R_PRIMARY_FILE_OP2,
FILE_DOWNLOAD_PATH)
val returnedDriverContainer = rStep.configureDriver(
KubernetesDriverSpec(
new Pod(),
new Container(),
Seq.empty[HasMetadata],
new SparkConf))
assert(returnedDriverContainer.driverContainer.getEnv
.asScala.map(env => (env.getName, env.getValue)).toMap ===
Map(
"R_FILE" -> RESOLVED_R_PRIMARY_FILE_OP2))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

FROM spark-base

# If this docker file is being used in the context of building your images from a Spark distribution, the docker build
# command should be invoked from the top level directory of the Spark distribution. E.g.:
# docker build -t spark-driver-r:latest -f dockerfiles/driver-r/Dockerfile .

ADD examples /opt/spark/examples
ADD R /opt/spark/R

RUN apk add --no-cache R R-dev

ENV R_HOME /usr/lib/R

CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \
readarray -t SPARK_DRIVER_JAVA_OPTS < /tmp/java_opts.txt && \
if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \
${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp $SPARK_CLASSPATH -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY $SPARK_DRIVER_CLASS $R_FILE $SPARK_DRIVER_ARGS
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

FROM spark-base

# If this docker file is being used in the context of building your images from a Spark distribution, the docker build
# command should be invoked from the top level directory of the Spark distribution. E.g.:
# docker build -t spark-executor-r:latest -f dockerfiles/executor-r/Dockerfile .

ADD examples /opt/spark/examples
ADD R /opt/spark/R

RUN apk add --no-cache R R-dev

ENV R_HOME /usr/lib/R

CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \
readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \
if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \
${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP
Loading