diff --git a/resource-managers/kubernetes/README.md b/resource-managers/kubernetes/README.md index 685ff343fa3be..31b721d193362 100644 --- a/resource-managers/kubernetes/README.md +++ b/resource-managers/kubernetes/README.md @@ -14,11 +14,14 @@ important matters to keep in mind when developing this feature. # Building Spark with Kubernetes Support -To build Spark with Kubernetes support, use the `kubernetes` profile when invoking Maven. For example, to simply compile -the Kubernetes core implementation module along with its dependencies: +To build Spark with Kubernetes support, use the `kubernetes` profile when invoking Maven. For example, to simply compile the Kubernetes core implementation module along with its dependencies: build/mvn compile -Pkubernetes -pl resource-managers/kubernetes/core -am -DskipTests +If this is the first time you compile the Kubernetes core implementation module, run the following command to install the dependencies and compile: + + build/mvn install -Pkubernetes -pl resource-managers/kubernetes/core -am -DskipTests + To build a distribution of Spark with Kubernetes support, use the `dev/make-distribution.sh` script, and add the `kubernetes` profile as part of the build arguments. Any other build arguments can be specified as one would expect when building Spark normally. For example, to build Spark against Hadoop 2.7 and Kubernetes: diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index 6e1633f6a63cb..53a184cba7a4d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -450,6 +450,22 @@ package object config extends Logging { .timeConf(TimeUnit.MINUTES) .createWithDefault(5) + private[spark] val EXECUTOR_SUBMITTED_SMALL_FILES_SECRET = + ConfigBuilder("spark.kubernetes.mountdependencies.smallfiles.executor.secretName") + .doc("Name of the secret that should be mounted into the executor containers for" + + " distributing submitted small files without the resource staging server.") + .internal() + .stringConf + .createOptional + + private[spark] val EXECUTOR_SUBMITTED_SMALL_FILES_SECRET_MOUNT_PATH = + ConfigBuilder("spark.kubernetes.mountdependencies.smallfiles.executor.secretMountPath") + .doc(s"Mount path in the executors for the secret given by" + + s" ${EXECUTOR_SUBMITTED_SMALL_FILES_SECRET.key}") + .internal() + .stringConf + .createOptional + private[spark] val EXECUTOR_INIT_CONTAINER_CONFIG_MAP = ConfigBuilder("spark.kubernetes.initcontainer.executor.configmapname") .doc("Name of the config map to use in the init-container that retrieves submitted files" + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index a377cc4942ad4..3c4b451a271f2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -70,6 +70,7 @@ package object constants { private[spark] val ENV_PYSPARK_FILES = "PYSPARK_FILES" private[spark] val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY" private[spark] val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_" + private[spark] val ENV_MOUNTED_FILES_FROM_SECRET_DIR = "SPARK_MOUNTED_FILES_FROM_SECRET_DIR" // Bootstrapping dependencies with the init-container private[spark] val INIT_CONTAINER_ANNOTATION = "pod.beta.kubernetes.io/init-containers" @@ -92,6 +93,9 @@ package object constants { private[spark] val DEFAULT_SHUFFLE_MOUNT_NAME = "shuffle" private[spark] val INIT_CONTAINER_SECRET_VOLUME_NAME = "spark-init-secret" + // Bootstrapping dependencies via a secret + private[spark] val MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH = "/etc/spark-submitted-files" + // Miscellaneous private[spark] val ANNOTATION_EXECUTOR_NODE_AFFINITY = "scheduler.alpha.kubernetes.io/affinity" private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala index 82abe55ac6989..b66da0b154698 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala @@ -20,7 +20,7 @@ import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.ConfigurationUtils import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, PythonStep} +import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSmallLocalFilesStep, PythonStep} import org.apache.spark.deploy.kubernetes.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator import org.apache.spark.launcher.SparkLauncher import org.apache.spark.util.Utils @@ -99,40 +99,77 @@ private[spark] class DriverConfigurationStepsOrchestrator( Option(new PythonStep(mainPyResource, additionalPythonFiles, filesDownloadPath)) case _ => Option.empty[DriverConfigurationStep] } - val initContainerBootstrapStep = if ((sparkJars ++ sparkFiles).exists { uri => - Option(Utils.resolveURI(uri).getScheme).getOrElse("file") != "local" - }) { - val initContainerConfigurationStepsOrchestrator = - new InitContainerConfigurationStepsOrchestrator( - namespace, - kubernetesResourceNamePrefix, - sparkJars, + + val (localFilesDownloadPath, submittedDependenciesBootstrapSteps) = + if (areAnyFilesNonContainerLocal(sparkJars ++ sparkFiles)) { + val (submittedLocalFilesDownloadPath, + sparkFilesResolvedFromInitContainer, + mountSmallFilesWithoutInitContainerStep) = + // If the resource staging server is specified, submit all local files through that. + submissionSparkConf.get(RESOURCE_STAGING_SERVER_URI).map { _ => + (filesDownloadPath, sparkFiles, Option.empty[DriverConfigurationStep]) + }.getOrElse { + // Else - use a small files bootstrap that submits the local files via a secret. + // Then, indicate to the outer block that the init-container should not handle + // those local files simply by filtering them out. + val sparkFilesWithoutLocal = KubernetesFileUtils.getNonSubmitterLocalFiles(sparkFiles) + val smallFilesSecretName = s"${kubernetesAppId}-submitted-files" + val mountSmallFilesBootstrap = new MountSmallFilesBootstrapImpl( + smallFilesSecretName, MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH) + val mountSmallLocalFilesStep = new MountSmallLocalFilesStep( sparkFiles, - jarsDownloadPath, - filesDownloadPath, - dockerImagePullPolicy, - allDriverLabels, + smallFilesSecretName, + MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH, + mountSmallFilesBootstrap) + (MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH, + sparkFilesWithoutLocal.toArray, + Some(mountSmallLocalFilesStep)) + } + + val initContainerBootstrapStep = + if (areAnyFilesNonContainerLocal(sparkJars ++ sparkFilesResolvedFromInitContainer)) { + val initContainerConfigurationStepsOrchestrator = + new InitContainerConfigurationStepsOrchestrator( + namespace, + kubernetesResourceNamePrefix, + sparkJars, + sparkFilesResolvedFromInitContainer, + jarsDownloadPath, + filesDownloadPath, + dockerImagePullPolicy, + allDriverLabels, + initContainerConfigMapName, + INIT_CONTAINER_CONFIG_MAP_KEY, + submissionSparkConf) + val initContainerConfigurationSteps = + initContainerConfigurationStepsOrchestrator.getAllConfigurationSteps() + Some(new InitContainerBootstrapStep(initContainerConfigurationSteps, initContainerConfigMapName, - INIT_CONTAINER_CONFIG_MAP_KEY, - submissionSparkConf) - val initContainerConfigurationSteps = - initContainerConfigurationStepsOrchestrator.getAllConfigurationSteps() - Some(new InitContainerBootstrapStep(initContainerConfigurationSteps, - initContainerConfigMapName, - INIT_CONTAINER_CONFIG_MAP_KEY)) + INIT_CONTAINER_CONFIG_MAP_KEY)) + } else Option.empty[DriverConfigurationStep] + (submittedLocalFilesDownloadPath, + mountSmallFilesWithoutInitContainerStep.toSeq ++ + initContainerBootstrapStep.toSeq) } else { - Option.empty[DriverConfigurationStep] + (filesDownloadPath, Seq.empty[DriverConfigurationStep]) } val dependencyResolutionStep = new DependencyResolutionStep( sparkJars, sparkFiles, jarsDownloadPath, - filesDownloadPath) + localFilesDownloadPath) Seq( initialSubmissionStep, kubernetesCredentialsStep, dependencyResolutionStep) ++ - initContainerBootstrapStep.toSeq ++ + submittedDependenciesBootstrapSteps ++ pythonStep.toSeq } + + private def areAnyFilesNonContainerLocal(files: Seq[String]): Boolean = { + files.exists { uri => + Option(Utils.resolveURI(uri).getScheme).getOrElse("file") != "local" + } + } + } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/MountSmallFilesBootstrap.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/MountSmallFilesBootstrap.scala new file mode 100644 index 0000000000000..79919b511ec37 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/MountSmallFilesBootstrap.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit + +import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder} + +import org.apache.spark.deploy.kubernetes.constants._ + +private[spark] trait MountSmallFilesBootstrap { + def mountSmallFilesSecret(pod: Pod, container: Container): (Pod, Container) +} + +private[spark] class MountSmallFilesBootstrapImpl( + secretName: String, secretMountPath: String) extends MountSmallFilesBootstrap { + def mountSmallFilesSecret(pod: Pod, container: Container): (Pod, Container) = { + val resolvedPod = new PodBuilder(pod) + .editOrNewSpec() + .addNewVolume() + .withName("submitted-files") + .withNewSecret() + .withSecretName(secretName) + .endSecret() + .endVolume() + .endSpec() + .build() + val resolvedContainer = new ContainerBuilder(container) + .addNewEnv() + .withName(ENV_MOUNTED_FILES_FROM_SECRET_DIR) + .withValue(secretMountPath) + .endEnv() + .addNewVolumeMount() + .withName("submitted-files") + .withMountPath(secretMountPath) + .endVolumeMount() + .build() + (resolvedPod, resolvedContainer) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/DependencyResolutionStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/DependencyResolutionStep.scala index dddc62410d6c9..090240420119e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/DependencyResolutionStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/DependencyResolutionStep.scala @@ -23,6 +23,7 @@ import io.fabric8.kubernetes.api.model.ContainerBuilder import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.deploy.kubernetes.submit.KubernetesFileUtils +import org.apache.spark.deploy.kubernetes.submit.submitsteps.initcontainer.InitContainerConfigurationStep import org.apache.spark.util.Utils /** @@ -36,11 +37,12 @@ private[spark] class DependencyResolutionStep( sparkJars: Seq[String], sparkFiles: Seq[String], jarsDownloadPath: String, - filesDownloadPath: String) extends DriverConfigurationStep { + localFilesDownloadPath: String) extends DriverConfigurationStep { override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { val resolvedSparkJars = KubernetesFileUtils.resolveSubmittedUris(sparkJars, jarsDownloadPath) - val resolvedSparkFiles = KubernetesFileUtils.resolveSubmittedUris(sparkFiles, filesDownloadPath) + val resolvedSparkFiles = KubernetesFileUtils.resolveSubmittedUris( + sparkFiles, localFilesDownloadPath) val sparkConfResolvedSparkDependencies = driverSpec.driverSparkConf.clone() if (resolvedSparkJars.nonEmpty) { sparkConfResolvedSparkDependencies.set("spark.jars", resolvedSparkJars.mkString(",")) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/InitContainerBootstrapStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/InitContainerBootstrapStep.scala index 29cad18c484c0..b4248338cc8de 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/InitContainerBootstrapStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/InitContainerBootstrapStep.scala @@ -33,32 +33,32 @@ private[spark] class InitContainerBootstrapStep( override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { var currentInitContainerSpec = InitContainerSpec( - initContainerProperties = Map.empty[String, String], - additionalDriverSparkConf = Map.empty[String, String], - initContainer = new ContainerBuilder().build(), - driverContainer = driverSpec.driverContainer, - podToInitialize = driverSpec.driverPod, - initContainerDependentResources = Seq.empty[HasMetadata]) + initContainerProperties = Map.empty[String, String], + additionalDriverSparkConf = Map.empty[String, String], + initContainer = new ContainerBuilder().build(), + driverContainer = driverSpec.driverContainer, + podToInitialize = driverSpec.driverPod, + initContainerDependentResources = Seq.empty[HasMetadata]) for (nextStep <- initContainerConfigurationSteps) { currentInitContainerSpec = nextStep.configureInitContainer(currentInitContainerSpec) } val configMap = PropertiesConfigMapFromScalaMapBuilder.buildConfigMap( - initContainerConfigMapName, - initContainerConfigMapKey, - currentInitContainerSpec.initContainerProperties) + initContainerConfigMapName, + initContainerConfigMapKey, + currentInitContainerSpec.initContainerProperties) val resolvedDriverSparkConf = driverSpec.driverSparkConf.clone() - .set(EXECUTOR_INIT_CONTAINER_CONFIG_MAP, initContainerConfigMapName) - .set(EXECUTOR_INIT_CONTAINER_CONFIG_MAP_KEY, initContainerConfigMapKey) - .setAll(currentInitContainerSpec.additionalDriverSparkConf) + .set(EXECUTOR_INIT_CONTAINER_CONFIG_MAP, initContainerConfigMapName) + .set(EXECUTOR_INIT_CONTAINER_CONFIG_MAP_KEY, initContainerConfigMapKey) + .setAll(currentInitContainerSpec.additionalDriverSparkConf) val resolvedDriverPod = InitContainerUtil.appendInitContainer( - currentInitContainerSpec.podToInitialize, currentInitContainerSpec.initContainer) + currentInitContainerSpec.podToInitialize, currentInitContainerSpec.initContainer) driverSpec.copy( - driverPod = resolvedDriverPod, - driverContainer = currentInitContainerSpec.driverContainer, - driverSparkConf = resolvedDriverSparkConf, - otherKubernetesResources = - driverSpec.otherKubernetesResources ++ - currentInitContainerSpec.initContainerDependentResources ++ - Seq(configMap)) + driverPod = resolvedDriverPod, + driverContainer = currentInitContainerSpec.driverContainer, + driverSparkConf = resolvedDriverSparkConf, + otherKubernetesResources = + driverSpec.otherKubernetesResources ++ + currentInitContainerSpec.initContainerDependentResources ++ + Seq(configMap)) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/MountSmallLocalFilesStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/MountSmallLocalFilesStep.scala new file mode 100644 index 0000000000000..cd1b7f6b7eb7e --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/MountSmallLocalFilesStep.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit.submitsteps + +import java.io.File + +import com.google.common.io.{BaseEncoding, Files} +import io.fabric8.kubernetes.api.model.SecretBuilder +import scala.collection.JavaConverters._ + +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.submit.{KubernetesFileUtils, MountSmallFilesBootstrap} +import org.apache.spark.util.Utils + +private[spark] class MountSmallLocalFilesStep( + sparkFiles: Seq[String], + smallFilesSecretName: String, + smallFilesSecretMountPath: String, + mountSmallFilesBootstrap: MountSmallFilesBootstrap) extends DriverConfigurationStep { + + import MountSmallLocalFilesStep._ + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { + val localFiles = KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkFiles).map(new File(_)) + val totalSizeBytes = localFiles.map(_.length()).sum + val totalSizeBytesString = Utils.bytesToString(totalSizeBytes) + require(totalSizeBytes < MAX_SECRET_BUNDLE_SIZE_BYTES, + s"Total size of all files submitted must be less than $MAX_SECRET_BUNDLE_SIZE_BYTES_STRING" + + s" if you do not use a resource staging server. The total size of all submitted local" + + s" files is $totalSizeBytesString. Please install a resource staging server and configure" + + s" your application to use it via ${RESOURCE_STAGING_SERVER_URI.key}") + val localFileBase64Contents = localFiles.map { file => + val fileBase64 = BaseEncoding.base64().encode(Files.toByteArray(file)) + (file.getName, fileBase64) + }.toMap + val localFilesSecret = new SecretBuilder() + .withNewMetadata() + .withName(smallFilesSecretName) + .endMetadata() + .withData(localFileBase64Contents.asJava) + .build() + val (resolvedDriverPod, resolvedDriverContainer) = + mountSmallFilesBootstrap.mountSmallFilesSecret( + driverSpec.driverPod, driverSpec.driverContainer) + val resolvedSparkConf = driverSpec.driverSparkConf.clone() + .set(EXECUTOR_SUBMITTED_SMALL_FILES_SECRET, smallFilesSecretName) + .set(EXECUTOR_SUBMITTED_SMALL_FILES_SECRET_MOUNT_PATH, smallFilesSecretMountPath) + driverSpec.copy( + driverPod = resolvedDriverPod, + driverContainer = resolvedDriverContainer, + driverSparkConf = resolvedSparkConf, + otherKubernetesResources = driverSpec.otherKubernetesResources ++ Seq(localFilesSecret)) + } +} + +private[spark] object MountSmallLocalFilesStep { + val MAX_SECRET_BUNDLE_SIZE_BYTES = 10240 + val MAX_SECRET_BUNDLE_SIZE_BYTES_STRING = + Utils.bytesToString(MAX_SECRET_BUNDLE_SIZE_BYTES) +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala index fa0ecca3b4ee6..b89e81bcb0be9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala @@ -24,6 +24,7 @@ import org.apache.spark.SparkContext import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, SparkKubernetesClientFactory, SparkPodInitContainerBootstrapImpl} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.deploy.kubernetes.submit.MountSmallFilesBootstrapImpl import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} @@ -40,28 +41,30 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit override def createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler) : SchedulerBackend = { val sparkConf = sc.getConf - val maybeConfigMap = sparkConf.get(EXECUTOR_INIT_CONTAINER_CONFIG_MAP) - val maybeConfigMapKey = sparkConf.get(EXECUTOR_INIT_CONTAINER_CONFIG_MAP_KEY) + val maybeInitContainerConfigMap = sparkConf.get(EXECUTOR_INIT_CONTAINER_CONFIG_MAP) + val maybeInitContainerConfigMapKey = sparkConf.get(EXECUTOR_INIT_CONTAINER_CONFIG_MAP_KEY) + val maybeSubmittedFilesSecret = sparkConf.get(EXECUTOR_SUBMITTED_SMALL_FILES_SECRET) + val maybeSubmittedFilesSecretMountPath = sparkConf.get( + EXECUTOR_SUBMITTED_SMALL_FILES_SECRET_MOUNT_PATH) val maybeExecutorInitContainerSecretName = sparkConf.get(EXECUTOR_INIT_CONTAINER_SECRET) - val maybeExecutorInitContainerSecretMount = + val maybeExecutorInitContainerSecretMountPath = sparkConf.get(EXECUTOR_INIT_CONTAINER_SECRET_MOUNT_DIR) val executorInitContainerSecretVolumePlugin = for { initContainerSecretName <- maybeExecutorInitContainerSecretName - initContainerSecretMountPath <- maybeExecutorInitContainerSecretMount + initContainerSecretMountPath <- maybeExecutorInitContainerSecretMountPath } yield { new InitContainerResourceStagingServerSecretPluginImpl( initContainerSecretName, initContainerSecretMountPath) } // Only set up the bootstrap if they've provided both the config map key and the config map - // name. Note that we generally expect both to have been set from spark-submit V2, but for - // testing developers may simply run the driver JVM locally, but the config map won't be set - // then. - val bootStrap = for { - configMap <- maybeConfigMap - configMapKey <- maybeConfigMapKey + // name. The config map might not be provided if init-containers aren't being used to + // bootstrap dependencies. + val executorInitContainerbootStrap = for { + configMap <- maybeInitContainerConfigMap + configMapKey <- maybeInitContainerConfigMapKey } yield { new SparkPodInitContainerBootstrapImpl( sparkConf.get(INIT_CONTAINER_DOCKER_IMAGE), @@ -72,11 +75,17 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit configMap, configMapKey) } - if (maybeConfigMap.isEmpty) { + val mountSmallFilesBootstrap = for { + secretName <- maybeSubmittedFilesSecret + secretMountPath <- maybeSubmittedFilesSecretMountPath + } yield { + new MountSmallFilesBootstrapImpl(secretName, secretMountPath) + } + if (maybeInitContainerConfigMap.isEmpty) { logWarning("The executor's init-container config map was not specified. Executors will" + " therefore not attempt to fetch remote or submitted dependencies.") } - if (maybeConfigMapKey.isEmpty) { + if (maybeInitContainerConfigMapKey.isEmpty) { logWarning("The executor's init-container config map key was not specified. Executors will" + " therefore not attempt to fetch remote or submitted dependencies.") } @@ -90,8 +99,9 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit new KubernetesClusterSchedulerBackend( sc.taskScheduler.asInstanceOf[TaskSchedulerImpl], sc, - bootStrap, + executorInitContainerbootStrap, executorInitContainerSecretVolumePlugin, + mountSmallFilesBootstrap, kubernetesClient) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 06de690800492..8876dfe722b88 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -22,22 +22,21 @@ import java.util.Collections import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} -import scala.collection.{concurrent, mutable} -import scala.collection.JavaConverters._ -import scala.concurrent.{ExecutionContext, Future} - import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action import org.apache.commons.io.FilenameUtils +import scala.collection.{concurrent, mutable} +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} import org.apache.spark.{SparkContext, SparkEnv, SparkException} import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.kubernetes.submit.InitContainerUtil +import org.apache.spark.deploy.kubernetes.submit.{InitContainerUtil, MountSmallFilesBootstrap} import org.apache.spark.internal.config import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClient @@ -52,6 +51,7 @@ private[spark] class KubernetesClusterSchedulerBackend( val sc: SparkContext, executorInitContainerBootstrap: Option[SparkPodInitContainerBootstrap], executorMountInitContainerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin], + mountSmallFilesBootstrap: Option[MountSmallFilesBootstrap], kubernetesClient: KubernetesClient) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { @@ -457,16 +457,7 @@ private[spark] class KubernetesClusterSchedulerBackend( .withValue(cp) .build() } - val executorExtraJavaOptionsEnv = conf - .get(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS) - .map { opts => - val delimitedOpts = Utils.splitCommandString(opts) - delimitedOpts.zipWithIndex.map { - case (opt, index) => - new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build() - } - }.getOrElse(Seq.empty[EnvVar]) - val executorEnv = (Seq( + val requiredEnv = (Seq( (ENV_EXECUTOR_PORT, executorPort.toString), (ENV_DRIVER_URL, driverUrl), // Executor backend expects integral value for executor cores, so round it up to an int. @@ -486,7 +477,7 @@ private[spark] class KubernetesClusterSchedulerBackend( .withNewFieldRef("v1", "status.podIP") .build()) .build() - ) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq + ) val requiredPorts = Seq( (EXECUTOR_PORT_NAME, executorPort), (BLOCK_MANAGER_PORT_NAME, blockmanagerPort)) @@ -506,7 +497,8 @@ private[spark] class KubernetesClusterSchedulerBackend( .addToLimits("memory", executorMemoryLimitQuantity) .addToRequests("cpu", executorCpuQuantity) .endResources() - .addAllToEnv(executorEnv.asJava) + .addAllToEnv(requiredEnv.asJava) + .addToEnv(executorExtraClasspathEnv.toSeq: _*) .withPorts(requiredPorts.asJava) .build() @@ -567,13 +559,18 @@ private[spark] class KubernetesClusterSchedulerBackend( .build() } }.getOrElse(executorPod) + val (withMaybeSmallFilesMountedPod, withMaybeSmallFilesMountedContainer) = + mountSmallFilesBootstrap.map { bootstrap => + bootstrap.mountSmallFilesSecret( + withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer) + }.getOrElse((withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer)) val (executorPodWithInitContainer, initBootstrappedExecutorContainer) = executorInitContainerBootstrap.map { bootstrap => val podWithDetachedInitContainer = bootstrap.bootstrapInitContainerAndVolumes( PodWithDetachedInitContainer( - withMaybeShuffleConfigPod, + withMaybeSmallFilesMountedPod, new ContainerBuilder().build(), - withMaybeShuffleConfigExecutorContainer)) + withMaybeSmallFilesMountedContainer)) val resolvedInitContainer = executorMountInitContainerSecretPlugin.map { plugin => plugin.mountResourceStagingServerSecretIntoInitContainer( @@ -588,7 +585,7 @@ private[spark] class KubernetesClusterSchedulerBackend( }.getOrElse(podWithAttachedInitContainer) (resolvedPodWithMountedSecret, podWithDetachedInitContainer.mainContainer) - }.getOrElse((withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer)) + }.getOrElse((withMaybeSmallFilesMountedPod, withMaybeSmallFilesMountedContainer)) val executorPodWithNodeAffinity = addNodeAffinityAnnotationIfUseful( executorPodWithInitContainer, nodeToLocalTaskCount) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestratorSuite.scala index e4f221ad99cc5..c168e7b5407ba 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestratorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestratorSuite.scala @@ -17,7 +17,8 @@ package org.apache.spark.deploy.kubernetes.submit import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, PythonStep} +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSmallLocalFilesStep, PythonStep} private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunSuite { @@ -31,7 +32,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS test("Base submission steps without an init-container or python files.") { val sparkConf = new SparkConf(false) - .set("spark.jars", "local:///var/apps/jars/jar1.jar") + .set("spark.jars", "local:///var/apps/jars/jar1.jar") val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar") val orchestrator = new DriverConfigurationStepsOrchestrator( NAMESPACE, @@ -43,16 +44,17 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS APP_ARGS, ADDITIONAL_PYTHON_FILES, sparkConf) - val steps = orchestrator.getAllConfigurationSteps() - assert(steps.size === 3) - assert(steps(0).isInstanceOf[BaseDriverConfigurationStep]) - assert(steps(1).isInstanceOf[DriverKubernetesCredentialsStep]) - assert(steps(2).isInstanceOf[DependencyResolutionStep]) + validateStepTypes( + orchestrator, + classOf[BaseDriverConfigurationStep], + classOf[DriverKubernetesCredentialsStep], + classOf[DependencyResolutionStep]) } test("Submission steps with an init-container.") { val sparkConf = new SparkConf(false) - .set("spark.jars", "hdfs://localhost:9000/var/apps/jars/jar1.jar") + .set("spark.jars", "hdfs://localhost:9000/var/apps/jars/jar1.jar") + .set(RESOURCE_STAGING_SERVER_URI, "https://localhost:8080") val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar") val orchestrator = new DriverConfigurationStepsOrchestrator( NAMESPACE, @@ -64,12 +66,12 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS APP_ARGS, ADDITIONAL_PYTHON_FILES, sparkConf) - val steps = orchestrator.getAllConfigurationSteps() - assert(steps.size === 4) - assert(steps(0).isInstanceOf[BaseDriverConfigurationStep]) - assert(steps(1).isInstanceOf[DriverKubernetesCredentialsStep]) - assert(steps(2).isInstanceOf[DependencyResolutionStep]) - assert(steps(3).isInstanceOf[InitContainerBootstrapStep]) + validateStepTypes( + orchestrator, + classOf[BaseDriverConfigurationStep], + classOf[DriverKubernetesCredentialsStep], + classOf[DependencyResolutionStep], + classOf[InitContainerBootstrapStep]) } test("Submission steps with python files.") { @@ -85,11 +87,40 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS APP_ARGS, ADDITIONAL_PYTHON_FILES, sparkConf) + validateStepTypes( + orchestrator, + classOf[BaseDriverConfigurationStep], + classOf[DriverKubernetesCredentialsStep], + classOf[DependencyResolutionStep], + classOf[PythonStep]) + } + + test("Only local files without a resource staging server.") { + val sparkConf = new SparkConf(false).set("spark.files", "/var/spark/file1.txt") + val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar") + val orchestrator = new DriverConfigurationStepsOrchestrator( + NAMESPACE, + APP_ID, + LAUNCH_TIME, + mainAppResource, + APP_NAME, + MAIN_CLASS, + APP_ARGS, + ADDITIONAL_PYTHON_FILES, + sparkConf) + validateStepTypes( + orchestrator, + classOf[BaseDriverConfigurationStep], + classOf[DriverKubernetesCredentialsStep], + classOf[DependencyResolutionStep], + classOf[MountSmallLocalFilesStep]) + } + + private def validateStepTypes( + orchestrator: DriverConfigurationStepsOrchestrator, + types: Class[_ <: DriverConfigurationStep]*): Unit = { val steps = orchestrator.getAllConfigurationSteps() - assert(steps.size === 4) - assert(steps(0).isInstanceOf[BaseDriverConfigurationStep]) - assert(steps(1).isInstanceOf[DriverKubernetesCredentialsStep]) - assert(steps(2).isInstanceOf[DependencyResolutionStep]) - assert(steps(3).isInstanceOf[PythonStep]) + assert(steps.size === types.size) + assert(steps.map(_.getClass) === types) } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/MountSmallLocalFilesStepTest.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/MountSmallLocalFilesStepTest.scala new file mode 100644 index 0000000000000..11744ef409c20 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/MountSmallLocalFilesStepTest.scala @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit.submitsteps + +import java.io.{File, RandomAccessFile} + +import com.google.common.base.Charsets +import com.google.common.io.{BaseEncoding, Files} +import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, HasMetadata, Pod, PodBuilder, Secret} +import org.junit.Test +import org.mockito.{Mock, MockitoAnnotations} +import org.scalatest.BeforeAndAfter +import scala.collection.JavaConverters._ + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.deploy.kubernetes.submit.MountSmallFilesBootstrap +import org.apache.spark.util.Utils + +private[spark] class MountSmallLocalFilesStepTest extends SparkFunSuite with BeforeAndAfter { + + private val FIRST_TEMP_FILE_NAME = "file1.txt" + private val SECOND_TEMP_FILE_NAME = "file2.txt" + private val FIRST_TEMP_FILE_CONTENTS = "123" + private val SECOND_TEMP_FILE_CONTENTS = "456" + private val REMOTE_FILE_URI = "hdfs://localhost:9000/file3.txt" + private val SECRET_NAME = "secret" + + private var tempFolder: File = _ + + private val mountSmallFilesBootstrap = new DummyMountSmallFilesBootstrap + + before { + MockitoAnnotations.initMocks(this) + tempFolder = Utils.createTempDir() + } + + after { + tempFolder.delete() + } + + test("Local files should be added to the secret.") { + val firstTempFile = createTempFileWithContents( + tempFolder, FIRST_TEMP_FILE_NAME, FIRST_TEMP_FILE_CONTENTS) + val secondTempFile = createTempFileWithContents( + tempFolder, SECOND_TEMP_FILE_NAME, SECOND_TEMP_FILE_CONTENTS) + val sparkFiles = Seq( + firstTempFile.getAbsolutePath, + secondTempFile.getAbsolutePath, + REMOTE_FILE_URI) + val configurationStep = new MountSmallLocalFilesStep( + sparkFiles, + SECRET_NAME, + MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH, + mountSmallFilesBootstrap) + val baseDriverSpec = new KubernetesDriverSpec( + new PodBuilder().build(), + new ContainerBuilder().build(), + Seq.empty[HasMetadata], + new SparkConf(false)) + val configuredDriverSpec = configurationStep.configureDriver(baseDriverSpec) + assert(configuredDriverSpec.otherKubernetesResources.size === 1) + assert(configuredDriverSpec.otherKubernetesResources(0).isInstanceOf[Secret]) + val localFilesSecret = configuredDriverSpec.otherKubernetesResources(0).asInstanceOf[Secret] + assert(localFilesSecret.getMetadata.getName === SECRET_NAME) + val expectedSecretContents = Map( + FIRST_TEMP_FILE_NAME -> BaseEncoding.base64().encode( + FIRST_TEMP_FILE_CONTENTS.getBytes(Charsets.UTF_8)), + SECOND_TEMP_FILE_NAME -> BaseEncoding.base64().encode( + SECOND_TEMP_FILE_CONTENTS.getBytes(Charsets.UTF_8))) + assert(localFilesSecret.getData.asScala === expectedSecretContents) + assert(configuredDriverSpec.driverPod.getMetadata.getLabels.asScala === + Map(mountSmallFilesBootstrap.LABEL_KEY -> mountSmallFilesBootstrap.LABEL_VALUE)) + assert(configuredDriverSpec.driverContainer.getEnv.size() === 1) + assert(configuredDriverSpec.driverContainer.getEnv.get(0).getName === + mountSmallFilesBootstrap.ENV_KEY) + assert(configuredDriverSpec.driverContainer.getEnv.get(0).getValue === + mountSmallFilesBootstrap.ENV_VALUE) + assert(configuredDriverSpec.driverSparkConf.get( + EXECUTOR_SUBMITTED_SMALL_FILES_SECRET) === + Some(SECRET_NAME)) + assert(configuredDriverSpec.driverSparkConf.get( + EXECUTOR_SUBMITTED_SMALL_FILES_SECRET_MOUNT_PATH) === + Some(MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH)) + } + + test("Using large files should throw an exception.") { + val largeTempFileContents = BaseEncoding.base64().encode(new Array[Byte](10241)) + val largeTempFile = createTempFileWithContents(tempFolder, "large.txt", largeTempFileContents) + val configurationStep = new MountSmallLocalFilesStep( + Seq(largeTempFile.getAbsolutePath), + SECRET_NAME, + MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH, + mountSmallFilesBootstrap) + val baseDriverSpec = new KubernetesDriverSpec( + new PodBuilder().build(), + new ContainerBuilder().build(), + Seq.empty[HasMetadata], + new SparkConf(false)) + try { + configurationStep.configureDriver(baseDriverSpec) + fail("Using the small local files mounter should not be allowed with big files.") + } catch { + case e: Throwable => + assert(e.getMessage === + s"requirement failed: Total size of all files submitted must be less than" + + s" ${MountSmallLocalFilesStep.MAX_SECRET_BUNDLE_SIZE_BYTES_STRING} if you do not" + + s" use a resource staging server. The total size of all submitted local" + + s" files is ${Utils.bytesToString(largeTempFile.length())}. Please install a" + + s" resource staging server and configure your application to use it via" + + s" ${RESOURCE_STAGING_SERVER_URI.key}" + ) + } + } + + private def createTempFileWithContents( + root: File, + fileName: String, + fileContents: String): File = { + val tempFile = new File(root, fileName) + tempFile.createNewFile() + Files.write(fileContents, tempFile, Charsets.UTF_8) + tempFile + } + + private class DummyMountSmallFilesBootstrap extends MountSmallFilesBootstrap { + val LABEL_KEY = "smallFilesLabelKey" + val LABEL_VALUE = "smallFilesLabelValue" + val ENV_KEY = "smallFilesEnvKey" + val ENV_VALUE = "smallFilesEnvValue" + + override def mountSmallFilesSecret(pod: Pod, container: Container): (Pod, Container) = { + val editedPod = new PodBuilder(pod) + .editOrNewMetadata() + .addToLabels(LABEL_KEY, LABEL_VALUE) + .endMetadata() + .build() + val editedContainer = new ContainerBuilder(container) + .addNewEnv() + .withName(ENV_KEY) + .withValue(ENV_VALUE) + .endEnv() + .build() + (editedPod, editedContainer) + } + } +} diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-py/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-py/Dockerfile index 41e4b31446c59..7b1effa911f19 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-py/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-py/Dockerfile @@ -44,5 +44,6 @@ CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ - if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ + if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ + if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \ ${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp $SPARK_CLASSPATH -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY $SPARK_DRIVER_CLASS $PYSPARK_PRIMARY $PYSPARK_FILES $SPARK_DRIVER_ARGS diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile index c2ff8f4f55822..26d1d805fde2b 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile @@ -30,4 +30,5 @@ CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ + if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \ ${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp $SPARK_CLASSPATH -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY $SPARK_DRIVER_CLASS $SPARK_DRIVER_ARGS diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile index cecb0c598d618..2b6aae6d7a3f2 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile @@ -38,10 +38,10 @@ ENV PYSPARK_PYTHON python ENV PYSPARK_DRIVER_PYTHON python ENV PYTHONPATH ${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${PYTHONPATH} +# TODO support spark.executor.extraClassPath CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ - env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \ - readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \ if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ - ${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP + if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \ + ${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile index 437fb645a253d..e0d9b8245ecfc 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile @@ -23,11 +23,11 @@ FROM spark-base COPY examples /opt/spark/examples +# TODO support spark.executor.extraClassPath CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ - env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \ - readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \ if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ - ${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP + if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \ + ${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/init-container/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/init-container/Dockerfile index 2ab3e6295b6d8..1178dd2428448 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/init-container/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/init-container/Dockerfile @@ -21,4 +21,4 @@ FROM spark-base # command should be invoked from the top level directory of the Spark distribution. E.g.: # docker build -t spark-init:latest -f dockerfiles/init-container/Dockerfile . -ENTRYPOINT [ "/opt/entrypoint.sh", "bin/spark-class", "org.apache.spark.deploy.rest.kubernetes.KubernetesSparkDependencyDownloadInitContainer" ] +ENTRYPOINT [ "/opt/entrypoint.sh", "/opt/spark/bin/spark-class", "org.apache.spark.deploy.rest.kubernetes.KubernetesSparkDependencyDownloadInitContainer" ] diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/resource-staging-server/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/resource-staging-server/Dockerfile index 0e0c9dd31aad6..87ed7d10f3eb3 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/resource-staging-server/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/resource-staging-server/Dockerfile @@ -22,4 +22,4 @@ FROM spark-base # command should be invoked from the top level directory of the Spark distribution. E.g.: # docker build -t spark-resource-staging-server:latest -f dockerfiles/resource-staging-server/Dockerfile . -ENTRYPOINT [ "/opt/entrypoint.sh", "bin/spark-class", "org.apache.spark.deploy.rest.kubernetes.ResourceStagingServer" ] +ENTRYPOINT [ "/opt/entrypoint.sh", "/opt/spark/bin/spark-class", "org.apache.spark.deploy.rest.kubernetes.ResourceStagingServer" ] diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/shuffle-service/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/shuffle-service/Dockerfile index 2ae0be4ee6c32..b76e66d316c5c 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/shuffle-service/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/shuffle-service/Dockerfile @@ -21,6 +21,4 @@ FROM spark-base # command should be invoked from the top level directory of the Spark distribution. E.g.: # docker build -t spark-shuffle:latest -f dockerfiles/shuffle-service/Dockerfile . -COPY examples /opt/spark/examples - -ENTRYPOINT [ "/opt/entrypoint.sh", "bin/spark-class", "org.apache.spark.deploy.kubernetes.KubernetesExternalShuffleService", "1" ] +ENTRYPOINT [ "/opt/entrypoint.sh", "/opt/spark/bin/spark-class", "org.apache.spark.deploy.kubernetes.KubernetesExternalShuffleService", "1" ] diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/spark-base/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/spark-base/Dockerfile index a982fc5fd30ce..83d1efa7f3a5f 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/spark-base/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/spark-base/Dockerfile @@ -23,6 +23,7 @@ FROM openjdk:8-alpine RUN apk upgrade --no-cache && \ apk add --no-cache bash tini && \ mkdir -p /opt/spark && \ + mkdir -p /opt/spark/work-dir \ touch /opt/spark/RELEASE && \ rm /bin/sh && \ ln -sv /bin/bash /bin/sh && \ @@ -36,6 +37,6 @@ COPY dockerfiles/spark-base/entrypoint.sh /opt/ ENV SPARK_HOME /opt/spark -WORKDIR /opt/spark +WORKDIR /opt/spark/work-dir ENTRYPOINT [ "/opt/entrypoint.sh" ] diff --git a/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/FileExistenceTest.scala b/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/FileExistenceTest.scala index 8994c998bffee..a9e328f4ff248 100644 --- a/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/FileExistenceTest.scala +++ b/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/FileExistenceTest.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.deploy.kubernetes.integrationtest.jobs +import java.io.File import java.nio.file.Paths import com.google.common.base.Charsets @@ -45,6 +46,20 @@ private[spark] object FileExistenceTest { } else { println(s"File found at ${file.getAbsolutePath} with correct contents.") } + val spark = SparkSession.builder().getOrCreate().sparkContext + val fileNamesRdd = spark.parallelize(Seq(args(0))) + if (fileNamesRdd.filter(fileName => new File(fileName).isFile()).count() != 1) { + throw new SparkException(s"Executors do not have the file ${args(0)}.") + } + val matchingContents = fileNamesRdd.map { fileName => + Files.toString(new File(fileName), Charsets.UTF_8) + }.filter(_.equals(args(1))) + if (matchingContents.count() != 1) { + throw new SparkException(s"The file on the executors at ${args(0)} did not have" + + s" the correct contents.") + } + println(s"File found on the executors at the relative path ${args(0)} with the" + + s" correct contents.") // scalastyle:on println } while (true) { diff --git a/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/JavaOptionsTest.scala b/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/JavaOptionsTest.scala index 967032eddccb5..114f8ec0408fa 100644 --- a/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/JavaOptionsTest.scala +++ b/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/JavaOptionsTest.scala @@ -29,13 +29,12 @@ private[spark] object JavaOptionsTest { def main(args: Array[String]): Unit = { // scalastyle:off println - if (args.length != 2) { + if (args.length != 1) { println(s"Invalid arguments: ${args.mkString(",")}." + - s"Usage: JavaOptionsTest ") + s"Usage: JavaOptionsTest ") System.exit(1) } val expectedDriverJavaOptions = loadPropertiesFromFile(args(0)) - val expectedExecutorJavaOptions = loadPropertiesFromFile(args(1)) val nonMatchingDriverOptions = expectedDriverJavaOptions.filter { case (optKey, optValue) => System.getProperty(optKey) != optValue } @@ -43,37 +42,15 @@ private[spark] object JavaOptionsTest { println(s"The driver's JVM options did not match. Expected $expectedDriverJavaOptions." + s" But these options did not match: $nonMatchingDriverOptions.") val sysProps = Maps.fromProperties(System.getProperties).asScala - println("Driver system properties are:") + println("System properties are:") for (prop <- sysProps) { println(s"Key: ${prop._1}, Value: ${prop._2}") } System.exit(1) } - val spark = SparkSession.builder().getOrCreate().sparkContext - try { - val nonMatchingExecutorOptions = spark.parallelize(Seq(0)).flatMap { _ => - expectedExecutorJavaOptions.filter { - case (optKey, optValue) => System.getProperty(optKey) != optValue - } - }.collectAsMap() - if (nonMatchingExecutorOptions.nonEmpty) { - val executorSysProps = spark.parallelize(Seq(0)).flatMap { _ => - Maps.fromProperties(System.getProperties).asScala - }.collectAsMap() - println(s"The executor's JVM options did not match. Expected" + - s" $expectedExecutorJavaOptions. But these options did not" + - s" match: $nonMatchingExecutorOptions.") - println("Executor system properties are:") - for (prop <- executorSysProps) { - println(s"Key: ${prop._1}, Value: ${prop._2}") - } - } else { - println("All expected JVM options were present on the driver and executors.") - } - } finally { - spark.stop() - } + // TODO support spark.executor.extraJavaOptions and test here. + println(s"All expected JVM options were present on the driver and executors.") // scalastyle:on println } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index 20e050a25b5cc..9c1f9775681e1 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -213,15 +213,18 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { test("Added files should be placed in the driver's working directory.") { assume(testBackend.name == MINIKUBE_TEST_BACKEND) + launchStagingServer(SSLOptions(), None) val testExistenceFileTempDir = Utils.createTempDir(namePrefix = "test-existence-file-temp-dir") val testExistenceFile = new File(testExistenceFileTempDir, "input.txt") Files.write(TEST_EXISTENCE_FILE_CONTENTS, testExistenceFile, Charsets.UTF_8) - launchStagingServer(SSLOptions(), None) sparkConf.set("spark.files", testExistenceFile.getAbsolutePath) runSparkApplicationAndVerifyCompletion( JavaMainAppResource(SUBMITTER_LOCAL_MAIN_APP_RESOURCE), FILE_EXISTENCE_MAIN_CLASS, - Seq(s"File found at /opt/spark/${testExistenceFile.getName} with correct contents."), + Seq( + s"File found at /opt/spark/work-dir/${testExistenceFile.getName} with correct contents.", + s"File found on the executors at the relative path ${testExistenceFile.getName} with" + + s" the correct contents."), Array(testExistenceFile.getName, TEST_EXISTENCE_FILE_CONTENTS), Seq.empty[String]) } @@ -231,31 +234,39 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { launchStagingServer(SSLOptions(), None) val driverJvmOptionsFile = storeJvmOptionsInTempFile( Map("simpleDriverConf" -> "simpleDriverConfValue", - "driverconfwithspaces" -> "driver conf with spaces value"), + "driverconfwithspaces" -> "driver conf with spaces value"), "driver-jvm-options.properties", "JVM options that should be set on the driver.") - val executorJvmOptionsFile = storeJvmOptionsInTempFile( - Map("simpleExecutorConf" -> "simpleExecutorConfValue", - "executor conf with spaces" -> "executor conf with spaces value"), - "executor-jvm-options.properties", - "JVM options that should be set on the executors.") sparkConf.set(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "-DsimpleDriverConf=simpleDriverConfValue" + " -Ddriverconfwithspaces='driver conf with spaces value'") - sparkConf.set(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS, - "-DsimpleExecutorConf=simpleExecutorConfValue" + - " -D\'executor conf with spaces\'=\'executor conf with spaces value\'") - sparkConf.set("spark.files", - Seq(driverJvmOptionsFile.getAbsolutePath, executorJvmOptionsFile.getAbsolutePath) - .mkString(",")) + sparkConf.set("spark.files", driverJvmOptionsFile.getAbsolutePath) runSparkApplicationAndVerifyCompletion( JavaMainAppResource(SUBMITTER_LOCAL_MAIN_APP_RESOURCE), JAVA_OPTIONS_MAIN_CLASS, Seq(s"All expected JVM options were present on the driver and executors."), - Array(driverJvmOptionsFile.getName, executorJvmOptionsFile.getName), + Array(driverJvmOptionsFile.getName), Seq.empty[String]) } + test("Submit small local files without the resource staging server.") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + sparkConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH)) + val testExistenceFileTempDir = Utils.createTempDir(namePrefix = "test-existence-file-temp-dir") + val testExistenceFile = new File(testExistenceFileTempDir, "input.txt") + Files.write(TEST_EXISTENCE_FILE_CONTENTS, testExistenceFile, Charsets.UTF_8) + sparkConf.set("spark.files", testExistenceFile.getAbsolutePath) + runSparkApplicationAndVerifyCompletion( + JavaMainAppResource(CONTAINER_LOCAL_MAIN_APP_RESOURCE), + FILE_EXISTENCE_MAIN_CLASS, + Seq( + s"File found at /opt/spark/work-dir/${testExistenceFile.getName} with correct contents.", + s"File found on the executors at the relative path ${testExistenceFile.getName} with" + + s" the correct contents."), + Array(testExistenceFile.getName, TEST_EXISTENCE_FILE_CONTENTS), + Seq.empty[String]) + } + test("Use a very long application name.") { assume(testBackend.name == MINIKUBE_TEST_BACKEND)