diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index a8b8de8e34d03..7e6c34e12facd 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -38,7 +38,7 @@ import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, UploadedAppResource} import org.apache.spark.deploy.rest.kubernetes._ import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils +import org.apache.spark.util.{ShutdownHookManager, Utils} private[spark] class Client( sparkConf: SparkConf, @@ -79,6 +79,8 @@ private[spark] class Client( private val serviceAccount = sparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) private val customLabels = sparkConf.get(KUBERNETES_DRIVER_LABELS) + private val kubernetesResourceCleaner = new KubernetesResourceCleaner + def run(): Unit = { logInfo(s"Starting application $kubernetesAppId in Kubernetes...") val submitterLocalFiles = KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkFiles) @@ -111,6 +113,8 @@ private[spark] class Client( val k8ClientConfig = k8ConfBuilder.build Utils.tryWithResource(new DefaultKubernetesClient(k8ClientConfig)) { kubernetesClient => + ShutdownHookManager.addShutdownHook(() => + kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient)) val submitServerSecret = kubernetesClient.secrets().createNew() .withNewMetadata() .withName(secretName) @@ -118,90 +122,64 @@ private[spark] class Client( .withData(Map((SUBMISSION_APP_SECRET_NAME, secretBase64String)).asJava) .withType("Opaque") .done() + kubernetesResourceCleaner.registerOrUpdateResource(submitServerSecret) try { - val (sslEnvs, sslVolumes, sslVolumeMounts, sslSecrets) = configureSsl(kubernetesClient, + val (sslEnvs, sslVolumes, sslVolumeMounts, sslSecrets) = configureSsl( + kubernetesClient, driverSubmitSslOptions, isKeyStoreLocalFile) - try { - // start outer watch for status logging of driver pod - val driverPodCompletedLatch = new CountDownLatch(1) - // only enable interval logging if in waitForAppCompletion mode - val loggingInterval = if (waitForAppCompletion) sparkConf.get(REPORT_INTERVAL) else 0 - val loggingWatch = new LoggingPodStatusWatcher(driverPodCompletedLatch, kubernetesAppId, - loggingInterval) - Utils.tryWithResource(kubernetesClient - .pods() - .withName(kubernetesAppId) - .watch(loggingWatch)) { _ => - val (driverPod, driverService) = launchDriverKubernetesComponents( - kubernetesClient, - parsedCustomLabels, - submitServerSecret, - driverSubmitSslOptions, - sslSecrets, - sslVolumes, - sslVolumeMounts, - sslEnvs, - isKeyStoreLocalFile) - val ownerReferenceConfiguredDriverService = try { - configureOwnerReferences( - kubernetesClient, - submitServerSecret, - sslSecrets, - driverPod, - driverService) - } catch { - case e: Throwable => - cleanupPodAndService(kubernetesClient, driverPod, driverService) - throw new SparkException("Failed to set owner references to the driver pod.", e) - } - try { - submitApplicationToDriverServer(kubernetesClient, driverSubmitSslOptions, - ownerReferenceConfiguredDriverService, submitterLocalFiles, submitterLocalJars) - // wait if configured to do so - if (waitForAppCompletion) { - logInfo(s"Waiting for application $kubernetesAppId to finish...") - driverPodCompletedLatch.await() - logInfo(s"Application $kubernetesAppId finished.") - } else { - logInfo(s"Application $kubernetesAppId successfully launched.") - } - } catch { - case e: Throwable => - cleanupPodAndService(kubernetesClient, driverPod, - ownerReferenceConfiguredDriverService) - throw new SparkException("Failed to submit the application to the driver pod.", e) - } - } - } finally { - Utils.tryLogNonFatalError { - // Secrets may have been mutated so delete by name to avoid problems with not having - // the latest version. - sslSecrets.foreach { secret => - kubernetesClient.secrets().withName(secret.getMetadata.getName).delete() - } + // start outer watch for status logging of driver pod + val driverPodCompletedLatch = new CountDownLatch(1) + // only enable interval logging if in waitForAppCompletion mode + val loggingInterval = if (waitForAppCompletion) sparkConf.get(REPORT_INTERVAL) else 0 + val loggingWatch = new LoggingPodStatusWatcher(driverPodCompletedLatch, kubernetesAppId, + loggingInterval) + Utils.tryWithResource(kubernetesClient + .pods() + .withName(kubernetesAppId) + .watch(loggingWatch)) { _ => + val (driverPod, driverService) = launchDriverKubernetesComponents( + kubernetesClient, + parsedCustomLabels, + submitServerSecret, + driverSubmitSslOptions, + sslSecrets, + sslVolumes, + sslVolumeMounts, + sslEnvs, + isKeyStoreLocalFile) + configureOwnerReferences( + kubernetesClient, + submitServerSecret, + sslSecrets, + driverPod, + driverService) + submitApplicationToDriverServer( + kubernetesClient, + driverSubmitSslOptions, + driverService, + submitterLocalFiles, + submitterLocalJars) + // Now that the application has started, persist the components that were created beyond + // the shutdown hook. We still want to purge the one-time secrets, so do not unregister + // those. + kubernetesResourceCleaner.unregisterResource(driverPod) + kubernetesResourceCleaner.unregisterResource(driverService) + // wait if configured to do so + if (waitForAppCompletion) { + logInfo(s"Waiting for application $kubernetesAppId to finish...") + driverPodCompletedLatch.await() + logInfo(s"Application $kubernetesAppId finished.") + } else { + logInfo(s"Application $kubernetesAppId successfully launched.") } } } finally { - Utils.tryLogNonFatalError { - kubernetesClient.secrets().withName(submitServerSecret.getMetadata.getName).delete() - } + kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient) } } } - private def cleanupPodAndService( - kubernetesClient: KubernetesClient, - driverPod: Pod, - driverService: Service): Unit = { - Utils.tryLogNonFatalError { - kubernetesClient.services().delete(driverService) - } - Utils.tryLogNonFatalError { - kubernetesClient.pods().delete(driverPod) - } - } - private def submitApplicationToDriverServer( kubernetesClient: KubernetesClient, driverSubmitSslOptions: SSLOptions, @@ -237,11 +215,13 @@ private[spark] class Client( .withPort(uiPort) .withNewTargetPort(uiPort) .build() - kubernetesClient.services().withName(kubernetesAppId).edit().editSpec() - .withType(uiServiceType) - .withPorts(uiServicePort) - .endSpec() + val resolvedService = kubernetesClient.services().withName(kubernetesAppId).edit() + .editSpec() + .withType(uiServiceType) + .withPorts(uiServicePort) + .endSpec() .done() + kubernetesResourceCleaner.registerOrUpdateResource(resolvedService) logInfo("Finished submitting application to Kubernetes.") } @@ -282,37 +262,19 @@ private[spark] class Client( kubernetesClient, driverKubernetesSelectors, submitServerSecret) - val driverPod = try { - createDriverPod( - kubernetesClient, - driverKubernetesSelectors, - submitServerSecret, - driverSubmitSslOptions, - sslVolumes, - sslVolumeMounts, - sslEnvs) - } catch { - case e: Throwable => - Utils.tryLogNonFatalError { - kubernetesClient.services().delete(driverService) - } - throw new SparkException("Failed to create the driver pod.", e) - } - try { - waitForReadyKubernetesComponents(kubernetesClient, endpointsReadyFuture, - serviceReadyFuture, podReadyFuture) - (driverPod, driverService) - } catch { - case e: Throwable => - Utils.tryLogNonFatalError { - kubernetesClient.services().delete(driverService) - } - Utils.tryLogNonFatalError { - kubernetesClient.pods().delete(driverPod) - } - throw new SparkException("Timed out while waiting for a Kubernetes component to be" + - " ready.", e) - } + kubernetesResourceCleaner.registerOrUpdateResource(driverService) + val driverPod = createDriverPod( + kubernetesClient, + driverKubernetesSelectors, + submitServerSecret, + driverSubmitSslOptions, + sslVolumes, + sslVolumeMounts, + sslEnvs) + kubernetesResourceCleaner.registerOrUpdateResource(driverPod) + waitForReadyKubernetesComponents(kubernetesClient, endpointsReadyFuture, + serviceReadyFuture, podReadyFuture) + (driverPod, driverService) } } } @@ -338,22 +300,32 @@ private[spark] class Client( .withController(true) .build() sslSecrets.foreach(secret => { - kubernetesClient.secrets().withName(secret.getMetadata.getName).edit() + val updatedSecret = kubernetesClient.secrets().withName(secret.getMetadata.getName).edit() .editMetadata() .addToOwnerReferences(driverPodOwnerRef) .endMetadata() .done() + kubernetesResourceCleaner.registerOrUpdateResource(updatedSecret) }) - kubernetesClient.secrets().withName(submitServerSecret.getMetadata.getName).edit() - .editMetadata() - .addToOwnerReferences(driverPodOwnerRef) - .endMetadata() - .done() - kubernetesClient.services().withName(driverService.getMetadata.getName).edit() - .editMetadata() - .addToOwnerReferences(driverPodOwnerRef) - .endMetadata() - .done() + val updatedSubmitServerSecret = kubernetesClient + .secrets() + .withName(submitServerSecret.getMetadata.getName) + .edit() + .editMetadata() + .addToOwnerReferences(driverPodOwnerRef) + .endMetadata() + .done() + kubernetesResourceCleaner.registerOrUpdateResource(updatedSubmitServerSecret) + val updatedService = kubernetesClient + .services() + .withName(driverService.getMetadata.getName) + .edit() + .editMetadata() + .addToOwnerReferences(driverPodOwnerRef) + .endMetadata() + .done() + kubernetesResourceCleaner.registerOrUpdateResource(updatedService) + updatedService } private def waitForReadyKubernetesComponents( @@ -417,7 +389,7 @@ private[spark] class Client( driverSubmitSslOptions: SSLOptions, sslVolumes: Array[Volume], sslVolumeMounts: Array[VolumeMount], - sslEnvs: Array[EnvVar]) = { + sslEnvs: Array[EnvVar]): Pod = { val containerPorts = buildContainerPorts() val probePingHttpGet = new HTTPGetActionBuilder() .withScheme(if (driverSubmitSslOptions.enabled) "HTTPS" else "HTTP") @@ -537,9 +509,11 @@ private[spark] class Client( (securityManager.getSSLOptions(KUBERNETES_SUBMIT_SSL_NAMESPACE), isLocalKeyStore) } - private def configureSsl(kubernetesClient: KubernetesClient, driverSubmitSslOptions: SSLOptions, - isKeyStoreLocalFile: Boolean): - (Array[EnvVar], Array[Volume], Array[VolumeMount], Array[Secret]) = { + private def configureSsl( + kubernetesClient: KubernetesClient, + driverSubmitSslOptions: SSLOptions, + isKeyStoreLocalFile: Boolean): + (Array[EnvVar], Array[Volume], Array[VolumeMount], Array[Secret]) = { if (driverSubmitSslOptions.enabled) { val sslSecretsMap = mutable.HashMap[String, String]() val sslEnvs = mutable.Buffer[EnvVar]() @@ -606,6 +580,7 @@ private[spark] class Client( .withData(sslSecretsMap.asJava) .withType("Opaque") .done() + kubernetesResourceCleaner.registerOrUpdateResource(sslSecrets) secrets += sslSecrets (sslEnvs.toArray, Array(sslVolume), Array(sslVolumeMount), secrets.toArray) } else { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesResourceCleaner.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesResourceCleaner.scala new file mode 100644 index 0000000000000..fb76b04604479 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesResourceCleaner.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes + +import io.fabric8.kubernetes.api.model.HasMetadata +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.mutable + +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +private[spark] class KubernetesResourceCleaner + extends Logging { + + private val resources = mutable.HashMap.empty[(String, String), HasMetadata] + + // Synchronized because deleteAllRegisteredResourcesFromKubernetes may be called from a + // shutdown hook + def registerOrUpdateResource(resource: HasMetadata): Unit = synchronized { + resources.put((resource.getMetadata.getName, resource.getKind), resource) + } + + def unregisterResource(resource: HasMetadata): Unit = synchronized { + resources.remove((resource.getMetadata.getName, resource.getKind)) + } + + def deleteAllRegisteredResourcesFromKubernetes(kubernetesClient: KubernetesClient): Unit = { + synchronized { + logInfo(s"Deleting ${resources.size} registered Kubernetes resources:") + resources.values.foreach { resource => + Utils.tryLogNonFatalError { + kubernetesClient.resource(resource).delete() + } + } + resources.clear() + } + } +}