From c57ccdc2898a4d9130a943f1fbd75bda238afa10 Mon Sep 17 00:00:00 2001 From: mccheah Date: Tue, 31 Jan 2017 12:07:01 -0800 Subject: [PATCH] Extract constants and config into separate file. Launch => Submit. (#65) * Extract constants and config into separate file. Launch => Submit. * Address comments * A small shorthand * Refactor more ThreadUtils * Fix scalastyle, use cached thread pool * Tiny Scala style change --- docs/running-on-kubernetes.md | 16 +- .../spark/deploy/kubernetes/Client.scala | 251 +++++++++--------- .../spark/deploy/kubernetes/config.scala | 177 ++++++++++++ .../spark/deploy/kubernetes/constants.scala | 70 +++++ .../rest/KubernetesRestProtocolMessages.scala | 21 +- .../kubernetes/KubernetesSparkRestApi.scala | 3 +- .../KubernetesClusterSchedulerBackend.scala | 162 +++++------ .../src/main/docker/driver/Dockerfile | 2 +- .../integrationtest/KubernetesSuite.scala | 18 +- 9 files changed, 470 insertions(+), 250 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index e25e189aa6d74..e256535fbbc9d 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -140,12 +140,12 @@ Spark supports using SSL to encrypt the traffic in this bootstrapping process. I whenever possible. See the [security page](security.html) and [configuration](configuration.html) sections for more information on -configuring SSL; use the prefix `spark.ssl.kubernetes.driverlaunch` in configuring the SSL-related fields in the context +configuring SSL; use the prefix `spark.ssl.kubernetes.submit` in configuring the SSL-related fields in the context of submitting to Kubernetes. For example, to set the trustStore used when the local machine communicates with the driver -pod in starting the application, set `spark.ssl.kubernetes.driverlaunch.trustStore`. +pod in starting the application, set `spark.ssl.kubernetes.submit.trustStore`. One note about the keyStore is that it can be specified as either a file on the client machine or a file in the -container image's disk. Thus `spark.ssl.kubernetes.driverlaunch.keyStore` can be a URI with a scheme of either `file:` +container image's disk. Thus `spark.ssl.kubernetes.submit.keyStore` can be a URI with a scheme of either `file:` or `container:`. A scheme of `file:` corresponds to the keyStore being located on the client machine; it is mounted onto the driver container as a [secret volume](https://kubernetes.io/docs/user-guide/secrets/). When the URI has the scheme `container:`, the file is assumed to already be on the container's disk at the appropriate path. @@ -235,7 +235,15 @@ from the other deployment modes. See the [configuration page](configuration.html (none) Custom labels that will be added to the driver pod. This should be a comma-separated list of label key-value pairs, - where each label is in the format key=value. + where each label is in the format key=value. Note that Spark also adds its own labels to the driver pod + for bookkeeping purposes. + + + + spark.kubernetes.driverSubmitTimeout + 60s + + Time to wait for the driver pod to start running before aborting its execution. diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 07a45c7577bcd..fed9334dbbab4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -18,13 +18,13 @@ package org.apache.spark.deploy.kubernetes import java.io.{File, FileInputStream} import java.security.{KeyStore, SecureRandom} -import java.util.concurrent.{Executors, TimeoutException, TimeUnit} +import java.util.concurrent.{TimeoutException, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean import javax.net.ssl.{SSLContext, TrustManagerFactory, X509TrustManager} import com.google.common.base.Charsets import com.google.common.io.Files -import com.google.common.util.concurrent.{SettableFuture, ThreadFactoryBuilder} +import com.google.common.util.concurrent.SettableFuture import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient, KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action @@ -34,11 +34,13 @@ import scala.collection.mutable import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.DurationInt -import org.apache.spark.{SecurityManager, SPARK_VERSION => sparkVersion, SparkConf, SparkException, SSLOptions} +import org.apache.spark.{SecurityManager, SparkConf, SparkException, SSLOptions} +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, TarGzippedData, UploadedAppResource} import org.apache.spark.deploy.rest.kubernetes._ import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} private[spark] class Client( sparkConf: SparkConf, @@ -47,25 +49,21 @@ private[spark] class Client( appArgs: Array[String]) extends Logging { import Client._ - private val namespace = sparkConf.get("spark.kubernetes.namespace", "default") + private val namespace = sparkConf.get(KUBERNETES_NAMESPACE) private val master = resolveK8sMaster(sparkConf.get("spark.master")) private val launchTime = System.currentTimeMillis private val appName = sparkConf.getOption("spark.app.name") - .orElse(sparkConf.getOption("spark.app.id")) .getOrElse("spark") private val kubernetesAppId = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-") - private val secretName = s"spark-submission-server-secret-$kubernetesAppId" - private val secretDirectory = s"$SPARK_SUBMISSION_SECRET_BASE_DIR/$kubernetesAppId" - private val sslSecretsDirectory = s"$SPARK_SUBMISSION_SECRET_BASE_DIR/$kubernetesAppId-ssl" - private val sslSecretsName = s"spark-submission-server-ssl-$kubernetesAppId" - private val driverLauncherSelectorValue = s"driver-launcher-$launchTime" - private val driverDockerImage = sparkConf.get( - "spark.kubernetes.driver.docker.image", s"spark-driver:$sparkVersion") - private val uploadedJars = sparkConf.getOption("spark.kubernetes.driver.uploads.jars") + private val secretName = s"$SUBMISSION_APP_SECRET_PREFIX-$kubernetesAppId" + private val secretDirectory = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$kubernetesAppId" + private val sslSecretsDirectory = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$kubernetesAppId-ssl" + private val sslSecretsName = s"$SUBMISSION_SSL_SECRETS_PREFIX-$kubernetesAppId" + private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE) + private val uploadedJars = sparkConf.get(KUBERNETES_DRIVER_UPLOAD_JARS) private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT) - private val driverLaunchTimeoutSecs = sparkConf.getTimeAsSeconds( - "spark.kubernetes.driverLaunchTimeout", s"${DEFAULT_LAUNCH_TIMEOUT_SECONDS}s") + private val driverSubmitTimeoutSecs = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT) private val secretBase64String = { val secretBytes = new Array[Byte](128) @@ -73,32 +71,27 @@ private[spark] class Client( Base64.encodeBase64String(secretBytes) } - private val serviceAccount = sparkConf.get("spark.kubernetes.submit.serviceAccountName", - "default") - - private val customLabels = sparkConf.get("spark.kubernetes.driver.labels", "") + private val serviceAccount = sparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) + private val customLabels = sparkConf.get(KUBERNETES_DRIVER_LABELS) private implicit val retryableExecutionContext = ExecutionContext .fromExecutorService( - Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() - .setNameFormat("kubernetes-client-retryable-futures-%d") - .setDaemon(true) - .build())) + ThreadUtils.newDaemonSingleThreadExecutor("kubernetes-client-retryable-futures")) def run(): Unit = { - val (driverLaunchSslOptions, isKeyStoreLocalFile) = parseDriverLaunchSslOptions() + val (driverSubmitSslOptions, isKeyStoreLocalFile) = parseDriverSubmitSslOptions() val parsedCustomLabels = parseCustomLabels(customLabels) var k8ConfBuilder = new ConfigBuilder() .withApiVersion("v1") .withMasterUrl(master) .withNamespace(namespace) - sparkConf.getOption("spark.kubernetes.submit.caCertFile").foreach { + sparkConf.get(KUBERNETES_CA_CERT_FILE).foreach { f => k8ConfBuilder = k8ConfBuilder.withCaCertFile(f) } - sparkConf.getOption("spark.kubernetes.submit.clientKeyFile").foreach { + sparkConf.get(KUBERNETES_CLIENT_KEY_FILE).foreach { f => k8ConfBuilder = k8ConfBuilder.withClientKeyFile(f) } - sparkConf.getOption("spark.kubernetes.submit.clientCertFile").foreach { + sparkConf.get(KUBERNETES_CLIENT_CERT_FILE).foreach { f => k8ConfBuilder = k8ConfBuilder.withClientCertFile(f) } @@ -108,15 +101,16 @@ private[spark] class Client( .withNewMetadata() .withName(secretName) .endMetadata() - .withData(Map((SUBMISSION_SERVER_SECRET_NAME, secretBase64String)).asJava) + .withData(Map((SUBMISSION_APP_SECRET_NAME, secretBase64String)).asJava) .withType("Opaque") .done() val (sslEnvs, sslVolumes, sslVolumeMounts, sslSecrets) = configureSsl(kubernetesClient, - driverLaunchSslOptions, + driverSubmitSslOptions, isKeyStoreLocalFile) try { val driverKubernetesSelectors = (Map( - DRIVER_LAUNCHER_SELECTOR_LABEL -> driverLauncherSelectorValue, + SPARK_DRIVER_LABEL -> kubernetesAppId, + SPARK_APP_ID_LABEL -> kubernetesAppId, SPARK_APP_NAME_LABEL -> appName) ++ parsedCustomLabels).asJava val containerPorts = buildContainerPorts() @@ -126,7 +120,7 @@ private[spark] class Client( submitCompletedFuture, submitPending, kubernetesClient, - driverLaunchSslOptions, + driverSubmitSslOptions, Array(submitServerSecret) ++ sslSecrets, driverKubernetesSelectors) Utils.tryWithResource(kubernetesClient @@ -141,7 +135,7 @@ private[spark] class Client( .withNewSpec() .withRestartPolicy("OnFailure") .addNewVolume() - .withName(s"spark-submission-secret-volume") + .withName(SUBMISSION_APP_SECRET_VOLUME_NAME) .withNewSecret() .withSecretName(submitServerSecret.getMetadata.getName) .endSecret() @@ -149,22 +143,22 @@ private[spark] class Client( .addToVolumes(sslVolumes: _*) .withServiceAccount(serviceAccount) .addNewContainer() - .withName(DRIVER_LAUNCHER_CONTAINER_NAME) + .withName(DRIVER_CONTAINER_NAME) .withImage(driverDockerImage) .withImagePullPolicy("IfNotPresent") .addNewVolumeMount() - .withName("spark-submission-secret-volume") + .withName(SUBMISSION_APP_SECRET_VOLUME_NAME) .withMountPath(secretDirectory) .withReadOnly(true) .endVolumeMount() .addToVolumeMounts(sslVolumeMounts: _*) .addNewEnv() - .withName("SPARK_SUBMISSION_SECRET_LOCATION") - .withValue(s"$secretDirectory/$SUBMISSION_SERVER_SECRET_NAME") + .withName(ENV_SUBMISSION_SECRET_LOCATION) + .withValue(s"$secretDirectory/$SUBMISSION_APP_SECRET_NAME") .endEnv() .addNewEnv() - .withName("SPARK_DRIVER_LAUNCHER_SERVER_PORT") - .withValue(DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT.toString) + .withName(ENV_SUBMISSION_SERVER_PORT) + .withValue(SUBMISSION_SERVER_PORT.toString) .endEnv() .addToEnv(sslEnvs: _*) .withPorts(containerPorts.asJava) @@ -173,7 +167,7 @@ private[spark] class Client( .done() var submitSucceeded = false try { - submitCompletedFuture.get(driverLaunchTimeoutSecs, TimeUnit.SECONDS) + submitCompletedFuture.get(driverSubmitTimeoutSecs, TimeUnit.SECONDS) submitSucceeded = true } catch { case e: TimeoutException => @@ -199,8 +193,8 @@ private[spark] class Client( } } - private def parseDriverLaunchSslOptions(): (SSLOptions, Boolean) = { - val maybeKeyStore = sparkConf.getOption("spark.ssl.kubernetes.driverlaunch.keyStore") + private def parseDriverSubmitSslOptions(): (SSLOptions, Boolean) = { + val maybeKeyStore = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_KEYSTORE) val resolvedSparkConf = sparkConf.clone() val (isLocalKeyStore, resolvedKeyStore) = maybeKeyStore.map(keyStore => { val keyStoreURI = Utils.resolveURI(keyStore) @@ -214,30 +208,29 @@ private[spark] class Client( (isProvidedKeyStoreLocal, Option.apply(keyStoreURI.getPath)) }).getOrElse((true, Option.empty[String])) resolvedKeyStore.foreach { - resolvedSparkConf.set("spark.ssl.kubernetes.driverlaunch.keyStore", _) + resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_KEYSTORE, _) } - sparkConf.getOption("spark.ssl.kubernetes.driverlaunch.trustStore").foreach { trustStore => + sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE).foreach { trustStore => val trustStoreURI = Utils.resolveURI(trustStore) trustStoreURI.getScheme match { case "file" | null => - resolvedSparkConf.set("spark.ssl.kubernetes.driverlaunch.trustStore", - trustStoreURI.getPath) + resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE, trustStoreURI.getPath) case _ => throw new SparkException(s"Invalid trustStore URI $trustStore; trustStore URI" + " for submit server must have no scheme, or scheme file://") } } val securityManager = new SecurityManager(resolvedSparkConf) - (securityManager.getSSLOptions("kubernetes.driverlaunch"), isLocalKeyStore) + (securityManager.getSSLOptions(KUBERNETES_SUBMIT_SSL_NAMESPACE), isLocalKeyStore) } - private def configureSsl(kubernetesClient: KubernetesClient, driverLaunchSslOptions: SSLOptions, + private def configureSsl(kubernetesClient: KubernetesClient, driverSubmitSslOptions: SSLOptions, isKeyStoreLocalFile: Boolean): (Array[EnvVar], Array[Volume], Array[VolumeMount], Array[Secret]) = { - if (driverLaunchSslOptions.enabled) { + if (driverSubmitSslOptions.enabled) { val sslSecretsMap = mutable.HashMap[String, String]() val sslEnvs = mutable.Buffer[EnvVar]() val secrets = mutable.Buffer[Secret]() - driverLaunchSslOptions.keyStore.foreach(store => { + driverSubmitSslOptions.keyStore.foreach(store => { val resolvedKeyStoreFile = if (isKeyStoreLocalFile) { if (!store.isFile) { throw new SparkException(s"KeyStore specified at $store is not a file or" + @@ -245,40 +238,40 @@ private[spark] class Client( } val keyStoreBytes = Files.toByteArray(store) val keyStoreBase64 = Base64.encodeBase64String(keyStoreBytes) - sslSecretsMap += (SSL_KEYSTORE_SECRET_NAME -> keyStoreBase64) - s"$sslSecretsDirectory/$SSL_KEYSTORE_SECRET_NAME" + sslSecretsMap += (SUBMISSION_SSL_KEYSTORE_SECRET_NAME -> keyStoreBase64) + s"$sslSecretsDirectory/$SUBMISSION_SSL_KEYSTORE_SECRET_NAME" } else { store.getAbsolutePath } sslEnvs += new EnvVarBuilder() - .withName("SPARK_SUBMISSION_KEYSTORE_FILE") + .withName(ENV_SUBMISSION_KEYSTORE_FILE) .withValue(resolvedKeyStoreFile) .build() }) - driverLaunchSslOptions.keyStorePassword.foreach(password => { + driverSubmitSslOptions.keyStorePassword.foreach(password => { val passwordBase64 = Base64.encodeBase64String(password.getBytes(Charsets.UTF_8)) - sslSecretsMap += (SSL_KEYSTORE_PASSWORD_SECRET_NAME -> passwordBase64) + sslSecretsMap += (SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME -> passwordBase64) sslEnvs += new EnvVarBuilder() - .withName("SPARK_SUBMISSION_KEYSTORE_PASSWORD_FILE") - .withValue(s"$sslSecretsDirectory/$SSL_KEYSTORE_PASSWORD_SECRET_NAME") + .withName(ENV_SUBMISSION_KEYSTORE_PASSWORD_FILE) + .withValue(s"$sslSecretsDirectory/$SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME") .build() }) - driverLaunchSslOptions.keyPassword.foreach(password => { + driverSubmitSslOptions.keyPassword.foreach(password => { val passwordBase64 = Base64.encodeBase64String(password.getBytes(Charsets.UTF_8)) - sslSecretsMap += (SSL_KEY_PASSWORD_SECRET_NAME -> passwordBase64) + sslSecretsMap += (SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME -> passwordBase64) sslEnvs += new EnvVarBuilder() - .withName("SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE") - .withValue(s"$sslSecretsDirectory/$SSL_KEY_PASSWORD_SECRET_NAME") + .withName(ENV_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE) + .withValue(s"$sslSecretsDirectory/$SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME") .build() }) - driverLaunchSslOptions.keyStoreType.foreach(storeType => { + driverSubmitSslOptions.keyStoreType.foreach(storeType => { sslEnvs += new EnvVarBuilder() - .withName("SPARK_SUBMISSION_KEYSTORE_TYPE") + .withName(ENV_SUBMISSION_KEYSTORE_TYPE) .withValue(storeType) .build() }) sslEnvs += new EnvVarBuilder() - .withName("SPARK_SUBMISSION_USE_SSL") + .withName(ENV_SUBMISSION_USE_SSL) .withValue("true") .build() val sslSecrets = kubernetesClient.secrets().createNew() @@ -290,13 +283,13 @@ private[spark] class Client( .done() secrets += sslSecrets val sslVolume = new VolumeBuilder() - .withName("spark-submission-server-ssl-secrets") + .withName(SUBMISSION_SSL_SECRETS_VOLUME_NAME) .withNewSecret() .withSecretName(sslSecrets.getMetadata.getName) .endSecret() .build() val sslVolumeMount = new VolumeMountBuilder() - .withName("spark-submission-server-ssl-secrets") + .withName(SUBMISSION_SSL_SECRETS_VOLUME_NAME) .withReadOnly(true) .withMountPath(sslSecretsDirectory) .build() @@ -310,7 +303,7 @@ private[spark] class Client( submitCompletedFuture: SettableFuture[Boolean], submitPending: AtomicBoolean, kubernetesClient: KubernetesClient, - driverLaunchSslOptions: SSLOptions, + driverSubmitSslOptions: SSLOptions, applicationSecrets: Array[Secret], driverKubernetesSelectors: java.util.Map[String, String]) extends Watcher[Pod] { override def eventReceived(action: Action, pod: Pod): Unit = { @@ -322,7 +315,7 @@ private[spark] class Client( .getContainerStatuses .asScala .find(status => - status.getName == DRIVER_LAUNCHER_CONTAINER_NAME && status.getReady) match { + status.getName == DRIVER_CONTAINER_NAME && status.getReady) match { case Some(_) => val ownerRefs = Seq(new OwnerReferenceBuilder() .withName(pod.getMetadata.getName) @@ -337,10 +330,10 @@ private[spark] class Client( kubernetesClient.secrets().createOrReplace(secret) }) - val driverLauncherServicePort = new ServicePortBuilder() - .withName(DRIVER_LAUNCHER_SERVICE_PORT_NAME) - .withPort(DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT) - .withNewTargetPort(DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT) + val driverSubmissionServicePort = new ServicePortBuilder() + .withName(SUBMISSION_SERVER_PORT_NAME) + .withPort(SUBMISSION_SERVER_PORT) + .withNewTargetPort(SUBMISSION_SERVER_PORT) .build() val service = kubernetesClient.services().createNew() .withNewMetadata() @@ -351,20 +344,25 @@ private[spark] class Client( .withNewSpec() .withType("NodePort") .withSelector(driverKubernetesSelectors) - .withPorts(driverLauncherServicePort) + .withPorts(driverSubmissionServicePort) .endSpec() .done() try { - sparkConf.set("spark.kubernetes.driver.service.name", - service.getMetadata.getName) - sparkConf.set("spark.kubernetes.driver.pod.name", kubernetesAppId) + sparkConf.getOption("spark.app.id").foreach { id => + logWarning(s"Warning: Provided app id in spark.app.id as $id will be" + + s" overridden as $kubernetesAppId") + } + sparkConf.set(KUBERNETES_DRIVER_POD_NAME, kubernetesAppId) + sparkConf.set(KUBERNETES_DRIVER_SERVICE_NAME, service.getMetadata.getName) + sparkConf.set("spark.app.id", kubernetesAppId) + sparkConf.setIfMissing("spark.app.name", appName) sparkConf.setIfMissing("spark.driver.port", DEFAULT_DRIVER_PORT.toString) sparkConf.setIfMissing("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT.toString) - val driverLauncher = buildDriverLauncherClient(kubernetesClient, service, - driverLaunchSslOptions) + val driverSubmitter = buildDriverSubmissionClient(kubernetesClient, service, + driverSubmitSslOptions) val ping = Retry.retry(5, 5.seconds) { - driverLauncher.ping() + driverSubmitter.ping() } ping onFailure { case t: Throwable => @@ -375,7 +373,7 @@ private[spark] class Client( Future { sparkConf.set("spark.driver.host", pod.getStatus.getPodIP) val submitRequest = buildSubmissionRequest() - driverLauncher.create(submitRequest) + driverSubmitter.submitApplication(submitRequest) } } submitComplete onFailure { @@ -436,17 +434,17 @@ private[spark] class Client( kubernetesClient.pods().withName(kubernetesAppId).get() } catch { case throwable: Throwable => - logError(s"Timed out while waiting $driverLaunchTimeoutSecs seconds for the" + + logError(s"Timed out while waiting $driverSubmitTimeoutSecs seconds for the" + " driver pod to start, but an error occurred while fetching the driver" + " pod's details.", throwable) - throw new SparkException(s"Timed out while waiting $driverLaunchTimeoutSecs" + + throw new SparkException(s"Timed out while waiting $driverSubmitTimeoutSecs" + " seconds for the driver pod to start. Unfortunately, in attempting to fetch" + " the latest state of the pod, another error was thrown. Check the logs for" + " the error that was thrown in looking up the driver pod.", e) } val topLevelMessage = s"The driver pod with name ${driverPod.getMetadata.getName}" + s" in namespace ${driverPod.getMetadata.getNamespace} was not ready in" + - s" $driverLaunchTimeoutSecs seconds." + s" $driverSubmitTimeoutSecs seconds." val podStatusPhase = if (driverPod.getStatus.getPhase != null) { s"Latest phase from the pod is: ${driverPod.getStatus.getPhase}" } else { @@ -460,7 +458,7 @@ private[spark] class Client( val failedDriverContainerStatusString = driverPod.getStatus .getContainerStatuses .asScala - .find(_.getName == DRIVER_LAUNCHER_CONTAINER_NAME) + .find(_.getName == DRIVER_CONTAINER_NAME) .map(status => { val lastState = status.getState if (lastState.getRunning != null) { @@ -481,17 +479,21 @@ private[spark] class Client( "Driver container last state: Unknown" } }).getOrElse("The driver container wasn't found in the pod; expected to find" + - s" container with name $DRIVER_LAUNCHER_CONTAINER_NAME") + s" container with name $DRIVER_CONTAINER_NAME") s"$topLevelMessage\n" + s"$podStatusPhase\n" + s"$podStatusMessage\n\n$failedDriverContainerStatusString" } private def buildContainerPorts(): Seq[ContainerPort] = { - Seq(sparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), - sparkConf.getInt("spark.blockManager.port", DEFAULT_BLOCKMANAGER_PORT), - DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT, - uiPort).map(new ContainerPortBuilder().withContainerPort(_).build()) + Seq((DRIVER_PORT_NAME, sparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT)), + (BLOCK_MANAGER_PORT_NAME, + sparkConf.getInt("spark.blockManager.port", DEFAULT_BLOCKMANAGER_PORT)), + (SUBMISSION_SERVER_PORT_NAME, SUBMISSION_SERVER_PORT), + (UI_PORT_NAME, uiPort)).map(port => new ContainerPortBuilder() + .withName(port._1) + .withContainerPort(port._2) + .build()) } private def buildSubmissionRequest(): KubernetesCreateSubmissionRequest = { @@ -526,22 +528,22 @@ private[spark] class Client( .map(CompressionUtils.createTarGzip(_)) } - private def buildDriverLauncherClient( + private def buildDriverSubmissionClient( kubernetesClient: KubernetesClient, service: Service, - driverLaunchSslOptions: SSLOptions): KubernetesSparkRestApi = { + driverSubmitSslOptions: SSLOptions): KubernetesSparkRestApi = { val servicePort = service .getSpec .getPorts .asScala - .filter(_.getName == DRIVER_LAUNCHER_SERVICE_PORT_NAME) + .filter(_.getName == SUBMISSION_SERVER_PORT_NAME) .head .getNodePort // NodePort is exposed on every node, so just pick one of them. // TODO be resilient to node failures and try all of them val node = kubernetesClient.nodes.list.getItems.asScala.head val nodeAddress = node.getStatus.getAddresses.asScala.head.getAddress - val urlScheme = if (driverLaunchSslOptions.enabled) { + val urlScheme = if (driverSubmitSslOptions.enabled) { "https" } else { logWarning("Submitting application details, application secret, and local" + @@ -550,8 +552,8 @@ private[spark] class Client( "http" } val (trustManager, sslContext): (X509TrustManager, SSLContext) = - if (driverLaunchSslOptions.enabled) { - buildSslConnectionConfiguration(driverLaunchSslOptions) + if (driverSubmitSslOptions.enabled) { + buildSslConnectionConfiguration(driverSubmitSslOptions) } else { (null, SSLContext.getDefault) } @@ -562,18 +564,18 @@ private[spark] class Client( trustContext = trustManager) } - private def buildSslConnectionConfiguration(driverLaunchSslOptions: SSLOptions) = { - driverLaunchSslOptions.trustStore.map(trustStoreFile => { + private def buildSslConnectionConfiguration(driverSubmitSslOptions: SSLOptions) = { + driverSubmitSslOptions.trustStore.map(trustStoreFile => { val trustManagerFactory = TrustManagerFactory.getInstance( TrustManagerFactory.getDefaultAlgorithm) val trustStore = KeyStore.getInstance( - driverLaunchSslOptions.trustStoreType.getOrElse(KeyStore.getDefaultType)) + driverSubmitSslOptions.trustStoreType.getOrElse(KeyStore.getDefaultType)) if (!trustStoreFile.isFile) { throw new SparkException(s"TrustStore file at ${trustStoreFile.getAbsolutePath}" + s" does not exist or is not a file.") } Utils.tryWithResource(new FileInputStream(trustStoreFile)) { trustStoreStream => - driverLaunchSslOptions.trustStorePassword match { + driverSubmitSslOptions.trustStorePassword match { case Some(password) => trustStore.load(trustStoreStream, password.toCharArray) case None => trustStore.load(trustStoreStream, null) @@ -587,44 +589,29 @@ private[spark] class Client( }).getOrElse((null, SSLContext.getDefault)) } - private def parseCustomLabels(labels: String): Map[String, String] = { - labels.split(",").map(_.trim).filterNot(_.isEmpty).map(label => { - label.split("=", 2).toSeq match { - case Seq(k, v) => - require(k != DRIVER_LAUNCHER_SELECTOR_LABEL, "Label with key" + - s" $DRIVER_LAUNCHER_SELECTOR_LABEL cannot be used in" + - " spark.kubernetes.driver.labels, as it is reserved for Spark's" + - " internal configuration.") - (k, v) - case _ => - throw new SparkException("Custom labels set by spark.kubernetes.driver.labels" + - " must be a comma-separated list of key-value pairs, with format =." + - s" Got label: $label. All labels: $labels") - } - }).toMap + private def parseCustomLabels(maybeLabels: Option[String]): Map[String, String] = { + maybeLabels.map(labels => { + labels.split(",").map(_.trim).filterNot(_.isEmpty).map(label => { + label.split("=", 2).toSeq match { + case Seq(k, v) => + require(k != SPARK_APP_ID_LABEL, "Label with key" + + s" $SPARK_APP_ID_LABEL cannot be used in" + + " spark.kubernetes.driver.labels, as it is reserved for Spark's" + + " internal configuration.") + (k, v) + case _ => + throw new SparkException("Custom labels set by spark.kubernetes.driver.labels" + + " must be a comma-separated list of key-value pairs, with format =." + + s" Got label: $label. All labels: $labels") + } + }).toMap + }).getOrElse(Map.empty[String, String]) } } private[spark] object Client extends Logging { - private val SUBMISSION_SERVER_SECRET_NAME = "spark-submission-server-secret" - private val SSL_KEYSTORE_SECRET_NAME = "spark-submission-server-keystore" - private val SSL_KEYSTORE_PASSWORD_SECRET_NAME = "spark-submission-server-keystore-password" - private val SSL_KEY_PASSWORD_SECRET_NAME = "spark-submission-server-key-password" - private val DRIVER_LAUNCHER_SELECTOR_LABEL = "driver-launcher-selector" - private val DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT = 7077 - private val DEFAULT_DRIVER_PORT = 7078 - private val DEFAULT_BLOCKMANAGER_PORT = 7079 - private val DEFAULT_UI_PORT = 4040 - private val UI_PORT_NAME = "spark-ui-port" - private val DRIVER_LAUNCHER_SERVICE_PORT_NAME = "driver-launcher-port" - private val DRIVER_PORT_NAME = "driver-port" - private val BLOCKMANAGER_PORT_NAME = "block-manager-port" - private val DRIVER_LAUNCHER_CONTAINER_NAME = "spark-kubernetes-driver-launcher" - private val SECURE_RANDOM = new SecureRandom() - private val SPARK_SUBMISSION_SECRET_BASE_DIR = "/var/run/secrets/spark-submission" - private val DEFAULT_LAUNCH_TIMEOUT_SECONDS = 60 - private val SPARK_APP_NAME_LABEL = "spark-app-name" + private[spark] val SECURE_RANDOM = new SecureRandom() def main(args: Array[String]): Unit = { require(args.length >= 2, s"Too few arguments. Usage: ${getClass.getName} " + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala new file mode 100644 index 0000000000000..9b145370f87d6 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes + +import java.util.concurrent.TimeUnit + +import org.apache.spark.{SPARK_VERSION => sparkVersion} +import org.apache.spark.internal.config.ConfigBuilder + +package object config { + + private[spark] val KUBERNETES_NAMESPACE = + ConfigBuilder("spark.kubernetes.namespace") + .doc(""" + | The namespace that will be used for running the driver and + | executor pods. When using spark-submit in cluster mode, + | this can also be passed to spark-submit via the + | --kubernetes-namespace command line argument. + """.stripMargin) + .stringConf + .createWithDefault("default") + + private[spark] val DRIVER_DOCKER_IMAGE = + ConfigBuilder("spark.kubernetes.driver.docker.image") + .doc(""" + | Docker image to use for the driver. Specify this using the + | standard Docker tag format. + """.stripMargin) + .stringConf + .createWithDefault(s"spark-driver:$sparkVersion") + + private[spark] val EXECUTOR_DOCKER_IMAGE = + ConfigBuilder("spark.kubernetes.executor.docker.image") + .doc(""" + | Docker image to use for the executors. Specify this using + | the standard Docker tag format. + """.stripMargin) + .stringConf + .createWithDefault(s"spark-executor:$sparkVersion") + + private[spark] val KUBERNETES_CA_CERT_FILE = + ConfigBuilder("spark.kubernetes.submit.caCertFile") + .doc(""" + | CA cert file for connecting to Kubernetes over SSL. This + | file should be located on the submitting machine's disk. + """.stripMargin) + .stringConf + .createOptional + + private[spark] val KUBERNETES_CLIENT_KEY_FILE = + ConfigBuilder("spark.kubernetes.submit.clientKeyFile") + .doc(""" + | Client key file for authenticating against the Kubernetes + | API server. This file should be located on the submitting + | machine's disk. + """.stripMargin) + .stringConf + .createOptional + + private[spark] val KUBERNETES_CLIENT_CERT_FILE = + ConfigBuilder("spark.kubernetes.submit.clientCertFile") + .doc(""" + | Client cert file for authenticating against the + | Kubernetes API server. This file should be located on + | the submitting machine's disk. + """.stripMargin) + .stringConf + .createOptional + + private[spark] val KUBERNETES_SERVICE_ACCOUNT_NAME = + ConfigBuilder("spark.kubernetes.submit.serviceAccountName") + .doc(""" + | Service account that is used when running the driver pod. + | The driver pod uses this service account when requesting + | executor pods from the API server. + """.stripMargin) + .stringConf + .createWithDefault("default") + + private[spark] val KUBERNETES_DRIVER_UPLOAD_JARS = + ConfigBuilder("spark.kubernetes.driver.uploads.jars") + .doc(""" + | Comma-separated list of jars to sent to the driver and + | all executors when submitting the application in cluster + | mode. + """.stripMargin) + .stringConf + .createOptional + + // Note that while we set a default for this when we start up the + // scheduler, the specific default value is dynamically determined + // based on the executor memory. + private[spark] val KUBERNETES_EXECUTOR_MEMORY_OVERHEAD = + ConfigBuilder("spark.kubernetes.executor.memoryOverhead") + .doc(""" + | The amount of off-heap memory (in megabytes) to be + | allocated per executor. This is memory that accounts for + | things like VM overheads, interned strings, other native + | overheads, etc. This tends to grow with the executor size + | (typically 6-10%). + """.stripMargin) + .stringConf + .createOptional + + private[spark] val KUBERNETES_DRIVER_LABELS = + ConfigBuilder("spark.kubernetes.driver.labels") + .doc(""" + | Custom labels that will be added to the driver pod. + | This should be a comma-separated list of label key-value + | pairs, where each label is in the format key=value. Note + | that Spark also adds its own labels to the driver pod + | for bookkeeping purposes. + """.stripMargin) + .stringConf + .createOptional + + private[spark] val KUBERNETES_DRIVER_SUBMIT_TIMEOUT = + ConfigBuilder("spark.kubernetes.driverSubmitTimeout") + .doc(""" + | Time to wait for the driver process to start running + | before aborting its execution. + """.stripMargin) + .timeConf(TimeUnit.SECONDS) + .createWithDefault(60L) + + private[spark] val KUBERNETES_DRIVER_SUBMIT_KEYSTORE = + ConfigBuilder("spark.ssl.kubernetes.submit.keyStore") + .doc(""" + | KeyStore file for the driver submission server listening + | on SSL. Can be pre-mounted on the driver container + | or uploaded from the submitting client. + """.stripMargin) + .stringConf + .createOptional + + private[spark] val KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE = + ConfigBuilder("spark.ssl.kubernetes.submit.trustStore") + .doc(""" + | TrustStore containing certificates for communicating + | to the driver submission server over SSL. + """.stripMargin) + .stringConf + .createOptional + + private[spark] val KUBERNETES_DRIVER_SERVICE_NAME = + ConfigBuilder("spark.kubernetes.driver.service.name") + .doc(""" + | Kubernetes service that exposes the driver pod + | for external access. + """.stripMargin) + .internal() + .stringConf + .createOptional + + private[spark] val KUBERNETES_DRIVER_POD_NAME = + ConfigBuilder("spark.kubernetes.driver.pod.name") + .doc(""" + | Name of the driver pod. + """.stripMargin) + .internal() + .stringConf + .createOptional +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala new file mode 100644 index 0000000000000..027cc3c022b4e --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes + +package object constants { + // Labels + private[spark] val SPARK_DRIVER_LABEL = "spark-driver" + private[spark] val SPARK_APP_ID_LABEL = "spark-app-id" + private[spark] val SPARK_APP_NAME_LABEL = "spark-app-name" + private[spark] val SPARK_EXECUTOR_ID_LABEL = "spark-exec-id" + + // Secrets + private[spark] val DRIVER_CONTAINER_SECRETS_BASE_DIR = "/var/run/secrets/spark-submission" + private[spark] val SUBMISSION_APP_SECRET_NAME = "spark-submission-server-secret" + private[spark] val SUBMISSION_APP_SECRET_PREFIX = "spark-submission-server-secret" + private[spark] val SUBMISSION_APP_SECRET_VOLUME_NAME = "spark-submission-secret-volume" + private[spark] val SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME = + "spark-submission-server-key-password" + private[spark] val SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME = + "spark-submission-server-keystore-password" + private[spark] val SUBMISSION_SSL_KEYSTORE_SECRET_NAME = "spark-submission-server-keystore" + private[spark] val SUBMISSION_SSL_SECRETS_PREFIX = "spark-submission-server-ssl" + private[spark] val SUBMISSION_SSL_SECRETS_VOLUME_NAME = "spark-submission-server-ssl-secrets" + + // Default and fixed ports + private[spark] val SUBMISSION_SERVER_PORT = 7077 + private[spark] val DEFAULT_DRIVER_PORT = 7078 + private[spark] val DEFAULT_BLOCKMANAGER_PORT = 7079 + private[spark] val DEFAULT_UI_PORT = 4040 + private[spark] val UI_PORT_NAME = "spark-ui-port" + private[spark] val SUBMISSION_SERVER_PORT_NAME = "submit-server" + private[spark] val BLOCK_MANAGER_PORT_NAME = "blockmanager" + private[spark] val DRIVER_PORT_NAME = "driver" + private[spark] val EXECUTOR_PORT_NAME = "executor" + + // Environment Variables + private[spark] val ENV_SUBMISSION_SECRET_LOCATION = "SPARK_SUBMISSION_SECRET_LOCATION" + private[spark] val ENV_SUBMISSION_SERVER_PORT = "SPARK_SUBMISSION_SERVER_PORT" + private[spark] val ENV_SUBMISSION_KEYSTORE_FILE = "SPARK_SUBMISSION_KEYSTORE_FILE" + private[spark] val ENV_SUBMISSION_KEYSTORE_PASSWORD_FILE = + "SPARK_SUBMISSION_KEYSTORE_PASSWORD_FILE" + private[spark] val ENV_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE = + "SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE" + private[spark] val ENV_SUBMISSION_KEYSTORE_TYPE = "SPARK_SUBMISSION_KEYSTORE_TYPE" + private[spark] val ENV_SUBMISSION_USE_SSL = "SPARK_SUBMISSION_USE_SSL" + private[spark] val ENV_EXECUTOR_PORT = "SPARK_EXECUTOR_PORT" + private[spark] val ENV_DRIVER_URL = "SPARK_DRIVER_URL" + private[spark] val ENV_EXECUTOR_CORES = "SPARK_EXECUTOR_CORES" + private[spark] val ENV_EXECUTOR_MEMORY = "SPARK_EXECUTOR_MEMORY" + private[spark] val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID" + private[spark] val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID" + + // Miscellaneous + private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver" + private[spark] val KUBERNETES_SUBMIT_SSL_NAMESPACE = "kubernetes.submit" +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala index 813d070e0f876..8beba23bc8e11 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala @@ -20,23 +20,22 @@ import com.fasterxml.jackson.annotation.{JsonSubTypes, JsonTypeInfo} import org.apache.spark.SPARK_VERSION -// TODO: jars should probably be compressed. Shipping tarballs would be optimal. case class KubernetesCreateSubmissionRequest( - val appResource: AppResource, - val mainClass: String, - val appArgs: Array[String], - val sparkProperties: Map[String, String], - val secret: String, - val uploadedJarsBase64Contents: Option[TarGzippedData]) extends SubmitRestProtocolRequest { + appResource: AppResource, + mainClass: String, + appArgs: Array[String], + sparkProperties: Map[String, String], + secret: String, + uploadedJarsBase64Contents: Option[TarGzippedData]) extends SubmitRestProtocolRequest { message = "create" clientSparkVersion = SPARK_VERSION } case class TarGzippedData( - val dataBase64: String, - val blockSize: Int = 10240, - val recordSize: Int = 512, - val encoding: String + dataBase64: String, + blockSize: Int = 10240, + recordSize: Int = 512, + encoding: String ) @JsonTypeInfo( diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestApi.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestApi.scala index 3cbcb16293b1d..18eb9b7a12ca6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestApi.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestApi.scala @@ -28,12 +28,11 @@ trait KubernetesSparkRestApi { @Consumes(Array(MediaType.APPLICATION_JSON)) @Produces(Array(MediaType.APPLICATION_JSON)) @Path("/create") - def create(request: KubernetesCreateSubmissionRequest): CreateSubmissionResponse + def submitApplication(request: KubernetesCreateSubmissionRequest): CreateSubmissionResponse @GET @Consumes(Array(MediaType.APPLICATION_JSON)) @Produces(Array(MediaType.APPLICATION_JSON)) @Path("/ping") def ping(): PingResponse - } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index dae4b2714b4e4..550ddd113fa42 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -21,17 +21,18 @@ import java.util.concurrent.Executors import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} import com.google.common.util.concurrent.ThreadFactoryBuilder -import io.fabric8.kubernetes.api.model.{ContainerPort, ContainerPortBuilder, EnvVar, EnvVarBuilder, Pod, QuantityBuilder} +import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, Pod, QuantityBuilder} import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer import scala.concurrent.{ExecutionContext, Future} import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.deploy.kubernetes.{Client, KubernetesClientBuilder} +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} private[spark] class KubernetesClusterSchedulerBackend( scheduler: TaskSchedulerImpl, @@ -44,24 +45,19 @@ private[spark] class KubernetesClusterSchedulerBackend( private val runningExecutorPods = new scala.collection.mutable.HashMap[String, Pod] private val kubernetesMaster = Client.resolveK8sMaster(sc.master) - - private val executorDockerImage = conf - .get("spark.kubernetes.executor.docker.image", s"spark-executor:${sc.version}") - - private val kubernetesNamespace = conf.get("spark.kubernetes.namespace", "default") - + private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE) + private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) private val executorPort = conf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) - private val blockmanagerPort = conf .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) private val kubernetesDriverServiceName = conf - .getOption("spark.kubernetes.driver.service.name") + .get(KUBERNETES_DRIVER_SERVICE_NAME) .getOrElse( throw new SparkException("Must specify the service name the driver is running with")) private val kubernetesDriverPodName = conf - .getOption("spark.kubernetes.driver.pod.name") + .get(KUBERNETES_DRIVER_POD_NAME) .getOrElse( throw new SparkException("Must specify the driver pod name")) @@ -69,7 +65,7 @@ private[spark] class KubernetesClusterSchedulerBackend( private val executorMemoryBytes = Utils.byteStringAsBytes(executorMemory) private val memoryOverheadBytes = conf - .getOption("spark.kubernetes.executor.memoryOverhead") + .get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD) .map(overhead => Utils.byteStringAsBytes(overhead)) .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryBytes).toInt, MEMORY_OVERHEAD_MIN)) @@ -78,16 +74,12 @@ private[spark] class KubernetesClusterSchedulerBackend( private val executorCores = conf.getOption("spark.executor.cores").getOrElse("1") private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( - Executors.newCachedThreadPool( - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("kubernetes-executor-requests-%d") - .build)) + ThreadUtils.newDaemonCachedThreadPool("kubernetes-executor-requests")) private val kubernetesClient = KubernetesClientBuilder .buildFromWithinPod(kubernetesMaster, kubernetesNamespace) - val driverPod = try { + private val driverPod = try { kubernetesClient.pods().inNamespace(kubernetesNamespace). withName(kubernetesDriverPodName).get() } catch { @@ -127,6 +119,8 @@ private[spark] class KubernetesClusterSchedulerBackend( } } + override def applicationId(): String = conf.get("spark.app.id", super.applicationId()) + override def sufficientResourcesRegistered(): Boolean = { totalRegisteredExecutors.get() >= initialExecutors * minRegisteredRatio } @@ -163,9 +157,9 @@ private[spark] class KubernetesClusterSchedulerBackend( private def allocateNewExecutorPod(): (String, Pod) = { val executorKubernetesId = UUID.randomUUID().toString.replaceAll("-", "") val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString - val name = s"$kubernetesDriverServiceName-exec-$executorKubernetesId" - val selectors = Map(SPARK_EXECUTOR_SELECTOR -> executorId, - SPARK_APP_SELECTOR -> applicationId()).asJava + val name = s"${applicationId()}-exec-$executorKubernetesId" + val selectors = Map(SPARK_EXECUTOR_ID_LABEL -> executorId, + SPARK_APP_ID_LABEL -> applicationId()).asJava val executorMemoryQuantity = new QuantityBuilder(false) .withAmount(executorMemoryBytes.toString) .build() @@ -175,69 +169,61 @@ private[spark] class KubernetesClusterSchedulerBackend( val executorCpuQuantity = new QuantityBuilder(false) .withAmount(executorCores) .build() - val requiredEnv = new ArrayBuffer[EnvVar] - requiredEnv += new EnvVarBuilder() - .withName("SPARK_EXECUTOR_PORT") - .withValue(executorPort.toString) - .build() - requiredEnv += new EnvVarBuilder() - .withName("SPARK_DRIVER_URL") - .withValue(driverUrl) - .build() - requiredEnv += new EnvVarBuilder() - .withName("SPARK_EXECUTOR_CORES") - .withValue(executorCores) - .build() - requiredEnv += new EnvVarBuilder() - .withName("SPARK_EXECUTOR_MEMORY") - .withValue(executorMemory) - .build() - requiredEnv += new EnvVarBuilder() - .withName("SPARK_APPLICATION_ID") - .withValue(applicationId()) - .build() - requiredEnv += new EnvVarBuilder() - .withName("SPARK_EXECUTOR_ID") - .withValue(executorId) - .build() - val requiredPorts = new ArrayBuffer[ContainerPort] - requiredPorts += new ContainerPortBuilder() - .withName(EXECUTOR_PORT_NAME) - .withContainerPort(executorPort) - .build() - requiredPorts += new ContainerPortBuilder() - .withName(BLOCK_MANAGER_PORT_NAME) - .withContainerPort(blockmanagerPort) - .build() - (executorKubernetesId, kubernetesClient.pods().createNew() - .withNewMetadata() - .withName(name) - .withLabels(selectors) - .withOwnerReferences() - .addNewOwnerReference() - .withController(true) - .withApiVersion(driverPod.getApiVersion) - .withKind(driverPod.getKind) - .withName(driverPod.getMetadata.getName) - .withUid(driverPod.getMetadata.getUid) - .endOwnerReference() - .endMetadata() - .withNewSpec() - .addNewContainer() - .withName(s"exec-${applicationId()}-container") - .withImage(executorDockerImage) - .withImagePullPolicy("IfNotPresent") - .withNewResources() - .addToRequests("memory", executorMemoryQuantity) - .addToLimits("memory", executorMemoryLimitQuantity) - .addToRequests("cpu", executorCpuQuantity) - .addToLimits("cpu", executorCpuQuantity) - .endResources() - .withEnv(requiredEnv.asJava) - .withPorts(requiredPorts.asJava) - .endContainer() - .endSpec() - .done()) + val requiredEnv = Seq( + (ENV_EXECUTOR_PORT, executorPort.toString), + (ENV_DRIVER_URL, driverUrl), + (ENV_EXECUTOR_CORES, executorCores), + (ENV_EXECUTOR_MEMORY, executorMemory), + (ENV_APPLICATION_ID, applicationId()), + (ENV_EXECUTOR_ID, executorId) + ).map(env => new EnvVarBuilder() + .withName(env._1) + .withValue(env._2) + .build()) + val requiredPorts = Seq( + (EXECUTOR_PORT_NAME, executorPort), + (BLOCK_MANAGER_PORT_NAME, blockmanagerPort)) + .map(port => { + new ContainerPortBuilder() + .withName(port._1) + .withContainerPort(port._2) + .build() + }) + try { + (executorKubernetesId, kubernetesClient.pods().createNew() + .withNewMetadata() + .withName(name) + .withLabels(selectors) + .withOwnerReferences() + .addNewOwnerReference() + .withController(true) + .withApiVersion(driverPod.getApiVersion) + .withKind(driverPod.getKind) + .withName(driverPod.getMetadata.getName) + .withUid(driverPod.getMetadata.getUid) + .endOwnerReference() + .endMetadata() + .withNewSpec() + .addNewContainer() + .withName(s"executor") + .withImage(executorDockerImage) + .withImagePullPolicy("IfNotPresent") + .withNewResources() + .addToRequests("memory", executorMemoryQuantity) + .addToLimits("memory", executorMemoryLimitQuantity) + .addToRequests("cpu", executorCpuQuantity) + .addToLimits("cpu", executorCpuQuantity) + .endResources() + .withEnv(requiredEnv.asJava) + .withPorts(requiredPorts.asJava) + .endContainer() + .endSpec() + .done()) + } catch { + case throwable: Throwable => + logError("Failed to allocate executor pod.", throwable) + throw throwable + } } override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] { @@ -269,13 +255,7 @@ private[spark] class KubernetesClusterSchedulerBackend( } private object KubernetesClusterSchedulerBackend { - private val SPARK_EXECUTOR_SELECTOR = "spark-exec" - private val SPARK_APP_SELECTOR = "spark-app" private val DEFAULT_STATIC_PORT = 10000 - private val DEFAULT_BLOCKMANAGER_PORT = 7079 - private val DEFAULT_DRIVER_PORT = 7078 - private val BLOCK_MANAGER_PORT_NAME = "blockmanager" - private val EXECUTOR_PORT_NAME = "executor" private val MEMORY_OVERHEAD_FACTOR = 0.10 private val MEMORY_OVERHEAD_MIN = 384L private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile index 070008fce7410..92fdfb8ac5f41 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile @@ -27,6 +27,6 @@ CMD SSL_ARGS="" && \ if ! [ -z ${SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE+x} ]; then SSL_ARGS="$SSL_ARGS --keystore-key-password-file $SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE"; fi && \ exec bin/spark-class org.apache.spark.deploy.rest.kubernetes.KubernetesSparkRestServer \ --hostname $HOSTNAME \ - --port $SPARK_DRIVER_LAUNCHER_SERVER_PORT \ + --port $SPARK_SUBMISSION_SERVER_PORT \ --secret-file $SPARK_SUBMISSION_SECRET_LOCATION \ ${SSL_ARGS} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index 13edea02dce9a..16de71118dec4 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -172,7 +172,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { .set("spark.executor.memory", "500m") .set("spark.executor.cores", "1") .set("spark.executors.instances", "1") - .set("spark.app.id", "spark-pi") + .set("spark.app.name", "spark-pi") .set("spark.ui.enabled", "true") .set("spark.testing", "false") val mainAppResource = s"file://$EXAMPLES_JAR" @@ -298,11 +298,11 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { .getLabels // We can't match all of the selectors directly since one of the selectors is based on the // launch time. - assert(driverPodLabels.size == 4, "Unexpected number of pod labels.") - assert(driverPodLabels.containsKey("driver-launcher-selector"), "Expected driver launcher" + - " selector label to be present.") + assert(driverPodLabels.size == 5, "Unexpected number of pod labels.") assert(driverPodLabels.get("spark-app-name") == "spark-pi", "Unexpected value for" + " spark-app-name label.") + assert(driverPodLabels.get("spark-app-id").startsWith("spark-pi"), "Unexpected value for" + + " spark-app-id label (should be prefixed with the app name).") assert(driverPodLabels.get("label1") == "label1value", "Unexpected value for label1") assert(driverPodLabels.get("label2") == "label2value", "Unexpected value for label2") } @@ -323,12 +323,12 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", - "--conf", "spark.ssl.kubernetes.driverlaunch.enabled=true", - "--conf", "spark.ssl.kubernetes.driverlaunch.keyStore=" + + "--conf", "spark.ssl.kubernetes.submit.enabled=true", + "--conf", "spark.ssl.kubernetes.submit.keyStore=" + s"file://${keyStoreFile.getAbsolutePath}", - "--conf", "spark.ssl.kubernetes.driverlaunch.keyStorePassword=changeit", - "--conf", "spark.ssl.kubernetes.driverlaunch.keyPassword=changeit", - "--conf", "spark.ssl.kubernetes.driverlaunch.trustStore=" + + "--conf", "spark.ssl.kubernetes.submit.keyStorePassword=changeit", + "--conf", "spark.ssl.kubernetes.submit.keyPassword=changeit", + "--conf", "spark.ssl.kubernetes.submit.trustStore=" + s"file://${trustStoreFile.getAbsolutePath}", "--conf", s"spark.ssl.kubernetes.driverlaunch.trustStorePassword=changeit", EXAMPLES_JAR)