diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index 6e1633f6a63cb..53a184cba7a4d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -450,6 +450,22 @@ package object config extends Logging { .timeConf(TimeUnit.MINUTES) .createWithDefault(5) + private[spark] val EXECUTOR_SUBMITTED_SMALL_FILES_SECRET = + ConfigBuilder("spark.kubernetes.mountdependencies.smallfiles.executor.secretName") + .doc("Name of the secret that should be mounted into the executor containers for" + + " distributing submitted small files without the resource staging server.") + .internal() + .stringConf + .createOptional + + private[spark] val EXECUTOR_SUBMITTED_SMALL_FILES_SECRET_MOUNT_PATH = + ConfigBuilder("spark.kubernetes.mountdependencies.smallfiles.executor.secretMountPath") + .doc(s"Mount path in the executors for the secret given by" + + s" ${EXECUTOR_SUBMITTED_SMALL_FILES_SECRET.key}") + .internal() + .stringConf + .createOptional + private[spark] val EXECUTOR_INIT_CONTAINER_CONFIG_MAP = ConfigBuilder("spark.kubernetes.initcontainer.executor.configmapname") .doc("Name of the config map to use in the init-container that retrieves submitted files" + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index 92f051b2ac298..ac2ca34fb34c7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -69,6 +69,7 @@ package object constants { private[spark] val ENV_MOUNTED_FILES_DIR = "SPARK_MOUNTED_FILES_DIR" private[spark] val ENV_PYSPARK_FILES = "PYSPARK_FILES" private[spark] val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY" + private[spark] val ENV_MOUNTED_FILES_FROM_SECRET_DIR = "SPARK_MOUNTED_FILES_FROM_SECRET_DIR" // Bootstrapping dependencies with the init-container private[spark] val INIT_CONTAINER_ANNOTATION = "pod.beta.kubernetes.io/init-containers" @@ -91,6 +92,9 @@ package object constants { private[spark] val DEFAULT_SHUFFLE_MOUNT_NAME = "shuffle" private[spark] val INIT_CONTAINER_SECRET_VOLUME_NAME = "spark-init-secret" + // Bootstrapping dependencies via a secret + private[spark] val MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH = "/etc/spark-submitted-files" + // Miscellaneous private[spark] val ANNOTATION_EXECUTOR_NODE_AFFINITY = "scheduler.alpha.kubernetes.io/affinity" private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala index 82abe55ac6989..b66da0b154698 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala @@ -20,7 +20,7 @@ import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.ConfigurationUtils import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, PythonStep} +import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSmallLocalFilesStep, PythonStep} import org.apache.spark.deploy.kubernetes.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator import org.apache.spark.launcher.SparkLauncher import org.apache.spark.util.Utils @@ -99,40 +99,77 @@ private[spark] class DriverConfigurationStepsOrchestrator( Option(new PythonStep(mainPyResource, additionalPythonFiles, filesDownloadPath)) case _ => Option.empty[DriverConfigurationStep] } - val initContainerBootstrapStep = if ((sparkJars ++ sparkFiles).exists { uri => - Option(Utils.resolveURI(uri).getScheme).getOrElse("file") != "local" - }) { - val initContainerConfigurationStepsOrchestrator = - new InitContainerConfigurationStepsOrchestrator( - namespace, - kubernetesResourceNamePrefix, - sparkJars, + + val (localFilesDownloadPath, submittedDependenciesBootstrapSteps) = + if (areAnyFilesNonContainerLocal(sparkJars ++ sparkFiles)) { + val (submittedLocalFilesDownloadPath, + sparkFilesResolvedFromInitContainer, + mountSmallFilesWithoutInitContainerStep) = + // If the resource staging server is specified, submit all local files through that. + submissionSparkConf.get(RESOURCE_STAGING_SERVER_URI).map { _ => + (filesDownloadPath, sparkFiles, Option.empty[DriverConfigurationStep]) + }.getOrElse { + // Else - use a small files bootstrap that submits the local files via a secret. + // Then, indicate to the outer block that the init-container should not handle + // those local files simply by filtering them out. + val sparkFilesWithoutLocal = KubernetesFileUtils.getNonSubmitterLocalFiles(sparkFiles) + val smallFilesSecretName = s"${kubernetesAppId}-submitted-files" + val mountSmallFilesBootstrap = new MountSmallFilesBootstrapImpl( + smallFilesSecretName, MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH) + val mountSmallLocalFilesStep = new MountSmallLocalFilesStep( sparkFiles, - jarsDownloadPath, - filesDownloadPath, - dockerImagePullPolicy, - allDriverLabels, + smallFilesSecretName, + MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH, + mountSmallFilesBootstrap) + (MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH, + sparkFilesWithoutLocal.toArray, + Some(mountSmallLocalFilesStep)) + } + + val initContainerBootstrapStep = + if (areAnyFilesNonContainerLocal(sparkJars ++ sparkFilesResolvedFromInitContainer)) { + val initContainerConfigurationStepsOrchestrator = + new InitContainerConfigurationStepsOrchestrator( + namespace, + kubernetesResourceNamePrefix, + sparkJars, + sparkFilesResolvedFromInitContainer, + jarsDownloadPath, + filesDownloadPath, + dockerImagePullPolicy, + allDriverLabels, + initContainerConfigMapName, + INIT_CONTAINER_CONFIG_MAP_KEY, + submissionSparkConf) + val initContainerConfigurationSteps = + initContainerConfigurationStepsOrchestrator.getAllConfigurationSteps() + Some(new InitContainerBootstrapStep(initContainerConfigurationSteps, initContainerConfigMapName, - INIT_CONTAINER_CONFIG_MAP_KEY, - submissionSparkConf) - val initContainerConfigurationSteps = - initContainerConfigurationStepsOrchestrator.getAllConfigurationSteps() - Some(new InitContainerBootstrapStep(initContainerConfigurationSteps, - initContainerConfigMapName, - INIT_CONTAINER_CONFIG_MAP_KEY)) + INIT_CONTAINER_CONFIG_MAP_KEY)) + } else Option.empty[DriverConfigurationStep] + (submittedLocalFilesDownloadPath, + mountSmallFilesWithoutInitContainerStep.toSeq ++ + initContainerBootstrapStep.toSeq) } else { - Option.empty[DriverConfigurationStep] + (filesDownloadPath, Seq.empty[DriverConfigurationStep]) } val dependencyResolutionStep = new DependencyResolutionStep( sparkJars, sparkFiles, jarsDownloadPath, - filesDownloadPath) + localFilesDownloadPath) Seq( initialSubmissionStep, kubernetesCredentialsStep, dependencyResolutionStep) ++ - initContainerBootstrapStep.toSeq ++ + submittedDependenciesBootstrapSteps ++ pythonStep.toSeq } + + private def areAnyFilesNonContainerLocal(files: Seq[String]): Boolean = { + files.exists { uri => + Option(Utils.resolveURI(uri).getScheme).getOrElse("file") != "local" + } + } + } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/MountSmallFilesBootstrap.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/MountSmallFilesBootstrap.scala new file mode 100644 index 0000000000000..79919b511ec37 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/MountSmallFilesBootstrap.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit + +import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder} + +import org.apache.spark.deploy.kubernetes.constants._ + +private[spark] trait MountSmallFilesBootstrap { + def mountSmallFilesSecret(pod: Pod, container: Container): (Pod, Container) +} + +private[spark] class MountSmallFilesBootstrapImpl( + secretName: String, secretMountPath: String) extends MountSmallFilesBootstrap { + def mountSmallFilesSecret(pod: Pod, container: Container): (Pod, Container) = { + val resolvedPod = new PodBuilder(pod) + .editOrNewSpec() + .addNewVolume() + .withName("submitted-files") + .withNewSecret() + .withSecretName(secretName) + .endSecret() + .endVolume() + .endSpec() + .build() + val resolvedContainer = new ContainerBuilder(container) + .addNewEnv() + .withName(ENV_MOUNTED_FILES_FROM_SECRET_DIR) + .withValue(secretMountPath) + .endEnv() + .addNewVolumeMount() + .withName("submitted-files") + .withMountPath(secretMountPath) + .endVolumeMount() + .build() + (resolvedPod, resolvedContainer) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/DependencyResolutionStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/DependencyResolutionStep.scala index dddc62410d6c9..090240420119e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/DependencyResolutionStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/DependencyResolutionStep.scala @@ -23,6 +23,7 @@ import io.fabric8.kubernetes.api.model.ContainerBuilder import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.deploy.kubernetes.submit.KubernetesFileUtils +import org.apache.spark.deploy.kubernetes.submit.submitsteps.initcontainer.InitContainerConfigurationStep import org.apache.spark.util.Utils /** @@ -36,11 +37,12 @@ private[spark] class DependencyResolutionStep( sparkJars: Seq[String], sparkFiles: Seq[String], jarsDownloadPath: String, - filesDownloadPath: String) extends DriverConfigurationStep { + localFilesDownloadPath: String) extends DriverConfigurationStep { override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { val resolvedSparkJars = KubernetesFileUtils.resolveSubmittedUris(sparkJars, jarsDownloadPath) - val resolvedSparkFiles = KubernetesFileUtils.resolveSubmittedUris(sparkFiles, filesDownloadPath) + val resolvedSparkFiles = KubernetesFileUtils.resolveSubmittedUris( + sparkFiles, localFilesDownloadPath) val sparkConfResolvedSparkDependencies = driverSpec.driverSparkConf.clone() if (resolvedSparkJars.nonEmpty) { sparkConfResolvedSparkDependencies.set("spark.jars", resolvedSparkJars.mkString(",")) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/InitContainerBootstrapStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/InitContainerBootstrapStep.scala index 29cad18c484c0..b4248338cc8de 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/InitContainerBootstrapStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/InitContainerBootstrapStep.scala @@ -33,32 +33,32 @@ private[spark] class InitContainerBootstrapStep( override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { var currentInitContainerSpec = InitContainerSpec( - initContainerProperties = Map.empty[String, String], - additionalDriverSparkConf = Map.empty[String, String], - initContainer = new ContainerBuilder().build(), - driverContainer = driverSpec.driverContainer, - podToInitialize = driverSpec.driverPod, - initContainerDependentResources = Seq.empty[HasMetadata]) + initContainerProperties = Map.empty[String, String], + additionalDriverSparkConf = Map.empty[String, String], + initContainer = new ContainerBuilder().build(), + driverContainer = driverSpec.driverContainer, + podToInitialize = driverSpec.driverPod, + initContainerDependentResources = Seq.empty[HasMetadata]) for (nextStep <- initContainerConfigurationSteps) { currentInitContainerSpec = nextStep.configureInitContainer(currentInitContainerSpec) } val configMap = PropertiesConfigMapFromScalaMapBuilder.buildConfigMap( - initContainerConfigMapName, - initContainerConfigMapKey, - currentInitContainerSpec.initContainerProperties) + initContainerConfigMapName, + initContainerConfigMapKey, + currentInitContainerSpec.initContainerProperties) val resolvedDriverSparkConf = driverSpec.driverSparkConf.clone() - .set(EXECUTOR_INIT_CONTAINER_CONFIG_MAP, initContainerConfigMapName) - .set(EXECUTOR_INIT_CONTAINER_CONFIG_MAP_KEY, initContainerConfigMapKey) - .setAll(currentInitContainerSpec.additionalDriverSparkConf) + .set(EXECUTOR_INIT_CONTAINER_CONFIG_MAP, initContainerConfigMapName) + .set(EXECUTOR_INIT_CONTAINER_CONFIG_MAP_KEY, initContainerConfigMapKey) + .setAll(currentInitContainerSpec.additionalDriverSparkConf) val resolvedDriverPod = InitContainerUtil.appendInitContainer( - currentInitContainerSpec.podToInitialize, currentInitContainerSpec.initContainer) + currentInitContainerSpec.podToInitialize, currentInitContainerSpec.initContainer) driverSpec.copy( - driverPod = resolvedDriverPod, - driverContainer = currentInitContainerSpec.driverContainer, - driverSparkConf = resolvedDriverSparkConf, - otherKubernetesResources = - driverSpec.otherKubernetesResources ++ - currentInitContainerSpec.initContainerDependentResources ++ - Seq(configMap)) + driverPod = resolvedDriverPod, + driverContainer = currentInitContainerSpec.driverContainer, + driverSparkConf = resolvedDriverSparkConf, + otherKubernetesResources = + driverSpec.otherKubernetesResources ++ + currentInitContainerSpec.initContainerDependentResources ++ + Seq(configMap)) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/MountSmallLocalFilesStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/MountSmallLocalFilesStep.scala new file mode 100644 index 0000000000000..cd1b7f6b7eb7e --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/MountSmallLocalFilesStep.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit.submitsteps + +import java.io.File + +import com.google.common.io.{BaseEncoding, Files} +import io.fabric8.kubernetes.api.model.SecretBuilder +import scala.collection.JavaConverters._ + +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.submit.{KubernetesFileUtils, MountSmallFilesBootstrap} +import org.apache.spark.util.Utils + +private[spark] class MountSmallLocalFilesStep( + sparkFiles: Seq[String], + smallFilesSecretName: String, + smallFilesSecretMountPath: String, + mountSmallFilesBootstrap: MountSmallFilesBootstrap) extends DriverConfigurationStep { + + import MountSmallLocalFilesStep._ + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { + val localFiles = KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkFiles).map(new File(_)) + val totalSizeBytes = localFiles.map(_.length()).sum + val totalSizeBytesString = Utils.bytesToString(totalSizeBytes) + require(totalSizeBytes < MAX_SECRET_BUNDLE_SIZE_BYTES, + s"Total size of all files submitted must be less than $MAX_SECRET_BUNDLE_SIZE_BYTES_STRING" + + s" if you do not use a resource staging server. The total size of all submitted local" + + s" files is $totalSizeBytesString. Please install a resource staging server and configure" + + s" your application to use it via ${RESOURCE_STAGING_SERVER_URI.key}") + val localFileBase64Contents = localFiles.map { file => + val fileBase64 = BaseEncoding.base64().encode(Files.toByteArray(file)) + (file.getName, fileBase64) + }.toMap + val localFilesSecret = new SecretBuilder() + .withNewMetadata() + .withName(smallFilesSecretName) + .endMetadata() + .withData(localFileBase64Contents.asJava) + .build() + val (resolvedDriverPod, resolvedDriverContainer) = + mountSmallFilesBootstrap.mountSmallFilesSecret( + driverSpec.driverPod, driverSpec.driverContainer) + val resolvedSparkConf = driverSpec.driverSparkConf.clone() + .set(EXECUTOR_SUBMITTED_SMALL_FILES_SECRET, smallFilesSecretName) + .set(EXECUTOR_SUBMITTED_SMALL_FILES_SECRET_MOUNT_PATH, smallFilesSecretMountPath) + driverSpec.copy( + driverPod = resolvedDriverPod, + driverContainer = resolvedDriverContainer, + driverSparkConf = resolvedSparkConf, + otherKubernetesResources = driverSpec.otherKubernetesResources ++ Seq(localFilesSecret)) + } +} + +private[spark] object MountSmallLocalFilesStep { + val MAX_SECRET_BUNDLE_SIZE_BYTES = 10240 + val MAX_SECRET_BUNDLE_SIZE_BYTES_STRING = + Utils.bytesToString(MAX_SECRET_BUNDLE_SIZE_BYTES) +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala index fa0ecca3b4ee6..b89e81bcb0be9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala @@ -24,6 +24,7 @@ import org.apache.spark.SparkContext import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, SparkKubernetesClientFactory, SparkPodInitContainerBootstrapImpl} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.deploy.kubernetes.submit.MountSmallFilesBootstrapImpl import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} @@ -40,28 +41,30 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit override def createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler) : SchedulerBackend = { val sparkConf = sc.getConf - val maybeConfigMap = sparkConf.get(EXECUTOR_INIT_CONTAINER_CONFIG_MAP) - val maybeConfigMapKey = sparkConf.get(EXECUTOR_INIT_CONTAINER_CONFIG_MAP_KEY) + val maybeInitContainerConfigMap = sparkConf.get(EXECUTOR_INIT_CONTAINER_CONFIG_MAP) + val maybeInitContainerConfigMapKey = sparkConf.get(EXECUTOR_INIT_CONTAINER_CONFIG_MAP_KEY) + val maybeSubmittedFilesSecret = sparkConf.get(EXECUTOR_SUBMITTED_SMALL_FILES_SECRET) + val maybeSubmittedFilesSecretMountPath = sparkConf.get( + EXECUTOR_SUBMITTED_SMALL_FILES_SECRET_MOUNT_PATH) val maybeExecutorInitContainerSecretName = sparkConf.get(EXECUTOR_INIT_CONTAINER_SECRET) - val maybeExecutorInitContainerSecretMount = + val maybeExecutorInitContainerSecretMountPath = sparkConf.get(EXECUTOR_INIT_CONTAINER_SECRET_MOUNT_DIR) val executorInitContainerSecretVolumePlugin = for { initContainerSecretName <- maybeExecutorInitContainerSecretName - initContainerSecretMountPath <- maybeExecutorInitContainerSecretMount + initContainerSecretMountPath <- maybeExecutorInitContainerSecretMountPath } yield { new InitContainerResourceStagingServerSecretPluginImpl( initContainerSecretName, initContainerSecretMountPath) } // Only set up the bootstrap if they've provided both the config map key and the config map - // name. Note that we generally expect both to have been set from spark-submit V2, but for - // testing developers may simply run the driver JVM locally, but the config map won't be set - // then. - val bootStrap = for { - configMap <- maybeConfigMap - configMapKey <- maybeConfigMapKey + // name. The config map might not be provided if init-containers aren't being used to + // bootstrap dependencies. + val executorInitContainerbootStrap = for { + configMap <- maybeInitContainerConfigMap + configMapKey <- maybeInitContainerConfigMapKey } yield { new SparkPodInitContainerBootstrapImpl( sparkConf.get(INIT_CONTAINER_DOCKER_IMAGE), @@ -72,11 +75,17 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit configMap, configMapKey) } - if (maybeConfigMap.isEmpty) { + val mountSmallFilesBootstrap = for { + secretName <- maybeSubmittedFilesSecret + secretMountPath <- maybeSubmittedFilesSecretMountPath + } yield { + new MountSmallFilesBootstrapImpl(secretName, secretMountPath) + } + if (maybeInitContainerConfigMap.isEmpty) { logWarning("The executor's init-container config map was not specified. Executors will" + " therefore not attempt to fetch remote or submitted dependencies.") } - if (maybeConfigMapKey.isEmpty) { + if (maybeInitContainerConfigMapKey.isEmpty) { logWarning("The executor's init-container config map key was not specified. Executors will" + " therefore not attempt to fetch remote or submitted dependencies.") } @@ -90,8 +99,9 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit new KubernetesClusterSchedulerBackend( sc.taskScheduler.asInstanceOf[TaskSchedulerImpl], sc, - bootStrap, + executorInitContainerbootStrap, executorInitContainerSecretVolumePlugin, + mountSmallFilesBootstrap, kubernetesClient) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 4eae6ee3184ba..759914d1ad9cf 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -22,22 +22,21 @@ import java.util.Collections import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} -import scala.collection.{concurrent, mutable} -import scala.collection.JavaConverters._ -import scala.concurrent.{ExecutionContext, Future} - import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action import org.apache.commons.io.FilenameUtils +import scala.collection.{concurrent, mutable} +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} import org.apache.spark.{SparkContext, SparkEnv, SparkException} import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.kubernetes.submit.InitContainerUtil +import org.apache.spark.deploy.kubernetes.submit.{InitContainerUtil, MountSmallFilesBootstrap} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClient import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEndpointAddress, RpcEnv} @@ -51,6 +50,7 @@ private[spark] class KubernetesClusterSchedulerBackend( val sc: SparkContext, executorInitContainerBootstrap: Option[SparkPodInitContainerBootstrap], executorMountInitContainerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin], + mountSmallFilesBootstrap: Option[MountSmallFilesBootstrap], kubernetesClient: KubernetesClient) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { @@ -557,13 +557,18 @@ private[spark] class KubernetesClusterSchedulerBackend( .build() } }.getOrElse(executorPod) + val (withMaybeSmallFilesMountedPod, withMaybeSmallFilesMountedContainer) = + mountSmallFilesBootstrap.map { bootstrap => + bootstrap.mountSmallFilesSecret( + withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer) + }.getOrElse(withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer) val (executorPodWithInitContainer, initBootstrappedExecutorContainer) = executorInitContainerBootstrap.map { bootstrap => val podWithDetachedInitContainer = bootstrap.bootstrapInitContainerAndVolumes( PodWithDetachedInitContainer( - withMaybeShuffleConfigPod, + withMaybeSmallFilesMountedPod, new ContainerBuilder().build(), - withMaybeShuffleConfigExecutorContainer)) + withMaybeSmallFilesMountedContainer)) val resolvedInitContainer = executorMountInitContainerSecretPlugin.map { plugin => plugin.mountResourceStagingServerSecretIntoInitContainer( @@ -578,7 +583,7 @@ private[spark] class KubernetesClusterSchedulerBackend( }.getOrElse(podWithAttachedInitContainer) (resolvedPodWithMountedSecret, podWithDetachedInitContainer.mainContainer) - }.getOrElse((withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer)) + }.getOrElse((withMaybeSmallFilesMountedPod, withMaybeSmallFilesMountedContainer)) val executorPodWithNodeAffinity = addNodeAffinityAnnotationIfUseful( executorPodWithInitContainer, nodeToLocalTaskCount) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestratorSuite.scala index e4f221ad99cc5..c168e7b5407ba 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestratorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestratorSuite.scala @@ -17,7 +17,8 @@ package org.apache.spark.deploy.kubernetes.submit import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, PythonStep} +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSmallLocalFilesStep, PythonStep} private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunSuite { @@ -31,7 +32,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS test("Base submission steps without an init-container or python files.") { val sparkConf = new SparkConf(false) - .set("spark.jars", "local:///var/apps/jars/jar1.jar") + .set("spark.jars", "local:///var/apps/jars/jar1.jar") val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar") val orchestrator = new DriverConfigurationStepsOrchestrator( NAMESPACE, @@ -43,16 +44,17 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS APP_ARGS, ADDITIONAL_PYTHON_FILES, sparkConf) - val steps = orchestrator.getAllConfigurationSteps() - assert(steps.size === 3) - assert(steps(0).isInstanceOf[BaseDriverConfigurationStep]) - assert(steps(1).isInstanceOf[DriverKubernetesCredentialsStep]) - assert(steps(2).isInstanceOf[DependencyResolutionStep]) + validateStepTypes( + orchestrator, + classOf[BaseDriverConfigurationStep], + classOf[DriverKubernetesCredentialsStep], + classOf[DependencyResolutionStep]) } test("Submission steps with an init-container.") { val sparkConf = new SparkConf(false) - .set("spark.jars", "hdfs://localhost:9000/var/apps/jars/jar1.jar") + .set("spark.jars", "hdfs://localhost:9000/var/apps/jars/jar1.jar") + .set(RESOURCE_STAGING_SERVER_URI, "https://localhost:8080") val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar") val orchestrator = new DriverConfigurationStepsOrchestrator( NAMESPACE, @@ -64,12 +66,12 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS APP_ARGS, ADDITIONAL_PYTHON_FILES, sparkConf) - val steps = orchestrator.getAllConfigurationSteps() - assert(steps.size === 4) - assert(steps(0).isInstanceOf[BaseDriverConfigurationStep]) - assert(steps(1).isInstanceOf[DriverKubernetesCredentialsStep]) - assert(steps(2).isInstanceOf[DependencyResolutionStep]) - assert(steps(3).isInstanceOf[InitContainerBootstrapStep]) + validateStepTypes( + orchestrator, + classOf[BaseDriverConfigurationStep], + classOf[DriverKubernetesCredentialsStep], + classOf[DependencyResolutionStep], + classOf[InitContainerBootstrapStep]) } test("Submission steps with python files.") { @@ -85,11 +87,40 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS APP_ARGS, ADDITIONAL_PYTHON_FILES, sparkConf) + validateStepTypes( + orchestrator, + classOf[BaseDriverConfigurationStep], + classOf[DriverKubernetesCredentialsStep], + classOf[DependencyResolutionStep], + classOf[PythonStep]) + } + + test("Only local files without a resource staging server.") { + val sparkConf = new SparkConf(false).set("spark.files", "/var/spark/file1.txt") + val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar") + val orchestrator = new DriverConfigurationStepsOrchestrator( + NAMESPACE, + APP_ID, + LAUNCH_TIME, + mainAppResource, + APP_NAME, + MAIN_CLASS, + APP_ARGS, + ADDITIONAL_PYTHON_FILES, + sparkConf) + validateStepTypes( + orchestrator, + classOf[BaseDriverConfigurationStep], + classOf[DriverKubernetesCredentialsStep], + classOf[DependencyResolutionStep], + classOf[MountSmallLocalFilesStep]) + } + + private def validateStepTypes( + orchestrator: DriverConfigurationStepsOrchestrator, + types: Class[_ <: DriverConfigurationStep]*): Unit = { val steps = orchestrator.getAllConfigurationSteps() - assert(steps.size === 4) - assert(steps(0).isInstanceOf[BaseDriverConfigurationStep]) - assert(steps(1).isInstanceOf[DriverKubernetesCredentialsStep]) - assert(steps(2).isInstanceOf[DependencyResolutionStep]) - assert(steps(3).isInstanceOf[PythonStep]) + assert(steps.size === types.size) + assert(steps.map(_.getClass) === types) } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/MountSmallLocalFilesStepTest.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/MountSmallLocalFilesStepTest.scala new file mode 100644 index 0000000000000..11744ef409c20 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/MountSmallLocalFilesStepTest.scala @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit.submitsteps + +import java.io.{File, RandomAccessFile} + +import com.google.common.base.Charsets +import com.google.common.io.{BaseEncoding, Files} +import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, HasMetadata, Pod, PodBuilder, Secret} +import org.junit.Test +import org.mockito.{Mock, MockitoAnnotations} +import org.scalatest.BeforeAndAfter +import scala.collection.JavaConverters._ + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.deploy.kubernetes.submit.MountSmallFilesBootstrap +import org.apache.spark.util.Utils + +private[spark] class MountSmallLocalFilesStepTest extends SparkFunSuite with BeforeAndAfter { + + private val FIRST_TEMP_FILE_NAME = "file1.txt" + private val SECOND_TEMP_FILE_NAME = "file2.txt" + private val FIRST_TEMP_FILE_CONTENTS = "123" + private val SECOND_TEMP_FILE_CONTENTS = "456" + private val REMOTE_FILE_URI = "hdfs://localhost:9000/file3.txt" + private val SECRET_NAME = "secret" + + private var tempFolder: File = _ + + private val mountSmallFilesBootstrap = new DummyMountSmallFilesBootstrap + + before { + MockitoAnnotations.initMocks(this) + tempFolder = Utils.createTempDir() + } + + after { + tempFolder.delete() + } + + test("Local files should be added to the secret.") { + val firstTempFile = createTempFileWithContents( + tempFolder, FIRST_TEMP_FILE_NAME, FIRST_TEMP_FILE_CONTENTS) + val secondTempFile = createTempFileWithContents( + tempFolder, SECOND_TEMP_FILE_NAME, SECOND_TEMP_FILE_CONTENTS) + val sparkFiles = Seq( + firstTempFile.getAbsolutePath, + secondTempFile.getAbsolutePath, + REMOTE_FILE_URI) + val configurationStep = new MountSmallLocalFilesStep( + sparkFiles, + SECRET_NAME, + MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH, + mountSmallFilesBootstrap) + val baseDriverSpec = new KubernetesDriverSpec( + new PodBuilder().build(), + new ContainerBuilder().build(), + Seq.empty[HasMetadata], + new SparkConf(false)) + val configuredDriverSpec = configurationStep.configureDriver(baseDriverSpec) + assert(configuredDriverSpec.otherKubernetesResources.size === 1) + assert(configuredDriverSpec.otherKubernetesResources(0).isInstanceOf[Secret]) + val localFilesSecret = configuredDriverSpec.otherKubernetesResources(0).asInstanceOf[Secret] + assert(localFilesSecret.getMetadata.getName === SECRET_NAME) + val expectedSecretContents = Map( + FIRST_TEMP_FILE_NAME -> BaseEncoding.base64().encode( + FIRST_TEMP_FILE_CONTENTS.getBytes(Charsets.UTF_8)), + SECOND_TEMP_FILE_NAME -> BaseEncoding.base64().encode( + SECOND_TEMP_FILE_CONTENTS.getBytes(Charsets.UTF_8))) + assert(localFilesSecret.getData.asScala === expectedSecretContents) + assert(configuredDriverSpec.driverPod.getMetadata.getLabels.asScala === + Map(mountSmallFilesBootstrap.LABEL_KEY -> mountSmallFilesBootstrap.LABEL_VALUE)) + assert(configuredDriverSpec.driverContainer.getEnv.size() === 1) + assert(configuredDriverSpec.driverContainer.getEnv.get(0).getName === + mountSmallFilesBootstrap.ENV_KEY) + assert(configuredDriverSpec.driverContainer.getEnv.get(0).getValue === + mountSmallFilesBootstrap.ENV_VALUE) + assert(configuredDriverSpec.driverSparkConf.get( + EXECUTOR_SUBMITTED_SMALL_FILES_SECRET) === + Some(SECRET_NAME)) + assert(configuredDriverSpec.driverSparkConf.get( + EXECUTOR_SUBMITTED_SMALL_FILES_SECRET_MOUNT_PATH) === + Some(MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH)) + } + + test("Using large files should throw an exception.") { + val largeTempFileContents = BaseEncoding.base64().encode(new Array[Byte](10241)) + val largeTempFile = createTempFileWithContents(tempFolder, "large.txt", largeTempFileContents) + val configurationStep = new MountSmallLocalFilesStep( + Seq(largeTempFile.getAbsolutePath), + SECRET_NAME, + MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH, + mountSmallFilesBootstrap) + val baseDriverSpec = new KubernetesDriverSpec( + new PodBuilder().build(), + new ContainerBuilder().build(), + Seq.empty[HasMetadata], + new SparkConf(false)) + try { + configurationStep.configureDriver(baseDriverSpec) + fail("Using the small local files mounter should not be allowed with big files.") + } catch { + case e: Throwable => + assert(e.getMessage === + s"requirement failed: Total size of all files submitted must be less than" + + s" ${MountSmallLocalFilesStep.MAX_SECRET_BUNDLE_SIZE_BYTES_STRING} if you do not" + + s" use a resource staging server. The total size of all submitted local" + + s" files is ${Utils.bytesToString(largeTempFile.length())}. Please install a" + + s" resource staging server and configure your application to use it via" + + s" ${RESOURCE_STAGING_SERVER_URI.key}" + ) + } + } + + private def createTempFileWithContents( + root: File, + fileName: String, + fileContents: String): File = { + val tempFile = new File(root, fileName) + tempFile.createNewFile() + Files.write(fileContents, tempFile, Charsets.UTF_8) + tempFile + } + + private class DummyMountSmallFilesBootstrap extends MountSmallFilesBootstrap { + val LABEL_KEY = "smallFilesLabelKey" + val LABEL_VALUE = "smallFilesLabelValue" + val ENV_KEY = "smallFilesEnvKey" + val ENV_VALUE = "smallFilesEnvValue" + + override def mountSmallFilesSecret(pod: Pod, container: Container): (Pod, Container) = { + val editedPod = new PodBuilder(pod) + .editOrNewMetadata() + .addToLabels(LABEL_KEY, LABEL_VALUE) + .endMetadata() + .build() + val editedContainer = new ContainerBuilder(container) + .addNewEnv() + .withName(ENV_KEY) + .withValue(ENV_VALUE) + .endEnv() + .build() + (editedPod, editedContainer) + } + } +} diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-py/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-py/Dockerfile index 731ea897458ce..80388856796f9 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-py/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-py/Dockerfile @@ -43,6 +43,7 @@ CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ + if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \ ${JAVA_HOME}/bin/java $SPARK_DRIVER_JAVA_OPTS -cp $SPARK_CLASSPATH \ -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY \ $SPARK_DRIVER_CLASS $PYSPARK_PRIMARY $PYSPARK_FILES $SPARK_DRIVER_ARGS diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile index bd28af950f4dd..8e162d0dab51f 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile @@ -28,4 +28,5 @@ CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ + if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \ ${JAVA_HOME}/bin/java $SPARK_DRIVER_JAVA_OPTS -cp $SPARK_CLASSPATH -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY $SPARK_DRIVER_CLASS $SPARK_DRIVER_ARGS diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile index f52578ad6edda..2b6aae6d7a3f2 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile @@ -43,4 +43,5 @@ CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ + if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \ ${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile index 8ad935ca396b1..e0d9b8245ecfc 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile @@ -29,4 +29,5 @@ CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ + if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \ ${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/init-container/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/init-container/Dockerfile index 2ab3e6295b6d8..1178dd2428448 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/init-container/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/init-container/Dockerfile @@ -21,4 +21,4 @@ FROM spark-base # command should be invoked from the top level directory of the Spark distribution. E.g.: # docker build -t spark-init:latest -f dockerfiles/init-container/Dockerfile . -ENTRYPOINT [ "/opt/entrypoint.sh", "bin/spark-class", "org.apache.spark.deploy.rest.kubernetes.KubernetesSparkDependencyDownloadInitContainer" ] +ENTRYPOINT [ "/opt/entrypoint.sh", "/opt/spark/bin/spark-class", "org.apache.spark.deploy.rest.kubernetes.KubernetesSparkDependencyDownloadInitContainer" ] diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/resource-staging-server/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/resource-staging-server/Dockerfile index 0e0c9dd31aad6..87ed7d10f3eb3 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/resource-staging-server/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/resource-staging-server/Dockerfile @@ -22,4 +22,4 @@ FROM spark-base # command should be invoked from the top level directory of the Spark distribution. E.g.: # docker build -t spark-resource-staging-server:latest -f dockerfiles/resource-staging-server/Dockerfile . -ENTRYPOINT [ "/opt/entrypoint.sh", "bin/spark-class", "org.apache.spark.deploy.rest.kubernetes.ResourceStagingServer" ] +ENTRYPOINT [ "/opt/entrypoint.sh", "/opt/spark/bin/spark-class", "org.apache.spark.deploy.rest.kubernetes.ResourceStagingServer" ] diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/shuffle-service/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/shuffle-service/Dockerfile index 2ae0be4ee6c32..b76e66d316c5c 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/shuffle-service/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/shuffle-service/Dockerfile @@ -21,6 +21,4 @@ FROM spark-base # command should be invoked from the top level directory of the Spark distribution. E.g.: # docker build -t spark-shuffle:latest -f dockerfiles/shuffle-service/Dockerfile . -COPY examples /opt/spark/examples - -ENTRYPOINT [ "/opt/entrypoint.sh", "bin/spark-class", "org.apache.spark.deploy.kubernetes.KubernetesExternalShuffleService", "1" ] +ENTRYPOINT [ "/opt/entrypoint.sh", "/opt/spark/bin/spark-class", "org.apache.spark.deploy.kubernetes.KubernetesExternalShuffleService", "1" ] diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/spark-base/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/spark-base/Dockerfile index 61d295a5b37c2..84e09604266b8 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/spark-base/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/spark-base/Dockerfile @@ -23,6 +23,7 @@ FROM openjdk:8-alpine RUN apk upgrade --no-cache && \ apk add --no-cache bash tini && \ mkdir -p /opt/spark && \ + mkdir -p /opt/spark/work-dir \ touch /opt/spark/RELEASE && \ chgrp root /etc/passwd && chmod ug+rw /etc/passwd @@ -34,6 +35,6 @@ COPY dockerfiles/spark-base/entrypoint.sh /opt/ ENV SPARK_HOME /opt/spark -WORKDIR /opt/spark +WORKDIR /opt/spark/work-dir ENTRYPOINT [ "/opt/entrypoint.sh" ] diff --git a/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/FileExistenceTest.scala b/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/FileExistenceTest.scala index 8994c998bffee..a9e328f4ff248 100644 --- a/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/FileExistenceTest.scala +++ b/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/FileExistenceTest.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.deploy.kubernetes.integrationtest.jobs +import java.io.File import java.nio.file.Paths import com.google.common.base.Charsets @@ -45,6 +46,20 @@ private[spark] object FileExistenceTest { } else { println(s"File found at ${file.getAbsolutePath} with correct contents.") } + val spark = SparkSession.builder().getOrCreate().sparkContext + val fileNamesRdd = spark.parallelize(Seq(args(0))) + if (fileNamesRdd.filter(fileName => new File(fileName).isFile()).count() != 1) { + throw new SparkException(s"Executors do not have the file ${args(0)}.") + } + val matchingContents = fileNamesRdd.map { fileName => + Files.toString(new File(fileName), Charsets.UTF_8) + }.filter(_.equals(args(1))) + if (matchingContents.count() != 1) { + throw new SparkException(s"The file on the executors at ${args(0)} did not have" + + s" the correct contents.") + } + println(s"File found on the executors at the relative path ${args(0)} with the" + + s" correct contents.") // scalastyle:on println } while (true) { diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index c6cd6a74c88d1..80b6685953fac 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -213,19 +213,40 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { test("Added files should be placed in the driver's working directory.") { assume(testBackend.name == MINIKUBE_TEST_BACKEND) + launchStagingServer(SSLOptions(), None) val testExistenceFileTempDir = Utils.createTempDir(namePrefix = "test-existence-file-temp-dir") val testExistenceFile = new File(testExistenceFileTempDir, "input.txt") Files.write(TEST_EXISTENCE_FILE_CONTENTS, testExistenceFile, Charsets.UTF_8) - launchStagingServer(SSLOptions(), None) sparkConf.set("spark.files", testExistenceFile.getAbsolutePath) runSparkApplicationAndVerifyCompletion( JavaMainAppResource(SUBMITTER_LOCAL_MAIN_APP_RESOURCE), FILE_EXISTENCE_MAIN_CLASS, - Seq(s"File found at /opt/spark/${testExistenceFile.getName} with correct contents."), + Seq( + s"File found at /opt/spark/work-dir/${testExistenceFile.getName} with correct contents.", + s"File found on the executors at the relative path ${testExistenceFile.getName} with" + + s" the correct contents."), Array(testExistenceFile.getName, TEST_EXISTENCE_FILE_CONTENTS), Seq.empty[String]) } + test("Submit small local files without the resource staging server.") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + sparkConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH)) + val testExistenceFileTempDir = Utils.createTempDir(namePrefix = "test-existence-file-temp-dir") + val testExistenceFile = new File(testExistenceFileTempDir, "input.txt") + Files.write(TEST_EXISTENCE_FILE_CONTENTS, testExistenceFile, Charsets.UTF_8) + sparkConf.set("spark.files", testExistenceFile.getAbsolutePath) + runSparkApplicationAndVerifyCompletion( + JavaMainAppResource(CONTAINER_LOCAL_MAIN_APP_RESOURCE), + FILE_EXISTENCE_MAIN_CLASS, + Seq( + s"File found at /opt/spark/work-dir/${testExistenceFile.getName} with correct contents.", + s"File found on the executors at the relative path ${testExistenceFile.getName} with" + + s" the correct contents."), + Array(testExistenceFile.getName, TEST_EXISTENCE_FILE_CONTENTS), + Seq.empty[String]) + } + test("Use a very long application name.") { assume(testBackend.name == MINIKUBE_TEST_BACKEND)