Skip to content

Commit

Permalink
Use a separate class to track components that need to be cleaned up (a…
Browse files Browse the repository at this point in the history
…pache#122)

* Refactor the cleaning up of Kubernetes components.

Create a KubernetesComponentsCleaner which can register arbitrary pods,
services, secrets, and ingresses. When an exception is thrown or the JVM
shuts down, the cleaner automatically purges any of its registered
components from Kubernetes. The components can be unregistered when the
driver successfully begins running, so that the application persists
beyond the lifetime of the spark-submit process.

* Fix spacing

* Address comments

* Fix compiler error

* Pull KubernetesComponentCleaner into instance variable

* Remove a parameter

* Remove redundant registerOrUpdateSecret for SSL

* Remove Ingresses from component cleaner

* Clear resources generically as opposed to specifying each type

* Remove incorrect test assertion

* Rename variable
  • Loading branch information
mccheah authored and foxish committed Jul 24, 2017
1 parent 6d179a6 commit e8359ca
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, UploadedAppResource}
import org.apache.spark.deploy.rest.kubernetes._
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
import org.apache.spark.util.{ShutdownHookManager, Utils}

private[spark] class Client(
sparkConf: SparkConf,
Expand Down Expand Up @@ -79,6 +79,8 @@ private[spark] class Client(
private val serviceAccount = sparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
private val customLabels = sparkConf.get(KUBERNETES_DRIVER_LABELS)

private val kubernetesResourceCleaner = new KubernetesResourceCleaner

def run(): Unit = {
logInfo(s"Starting application $kubernetesAppId in Kubernetes...")
val submitterLocalFiles = KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkFiles)
Expand Down Expand Up @@ -111,97 +113,73 @@ private[spark] class Client(

val k8ClientConfig = k8ConfBuilder.build
Utils.tryWithResource(new DefaultKubernetesClient(k8ClientConfig)) { kubernetesClient =>
ShutdownHookManager.addShutdownHook(() =>
kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient))
val submitServerSecret = kubernetesClient.secrets().createNew()
.withNewMetadata()
.withName(secretName)
.endMetadata()
.withData(Map((SUBMISSION_APP_SECRET_NAME, secretBase64String)).asJava)
.withType("Opaque")
.done()
kubernetesResourceCleaner.registerOrUpdateResource(submitServerSecret)
try {
val (sslEnvs, sslVolumes, sslVolumeMounts, sslSecrets) = configureSsl(kubernetesClient,
val (sslEnvs, sslVolumes, sslVolumeMounts, sslSecrets) = configureSsl(
kubernetesClient,
driverSubmitSslOptions,
isKeyStoreLocalFile)
try {
// start outer watch for status logging of driver pod
val driverPodCompletedLatch = new CountDownLatch(1)
// only enable interval logging if in waitForAppCompletion mode
val loggingInterval = if (waitForAppCompletion) sparkConf.get(REPORT_INTERVAL) else 0
val loggingWatch = new LoggingPodStatusWatcher(driverPodCompletedLatch, kubernetesAppId,
loggingInterval)
Utils.tryWithResource(kubernetesClient
.pods()
.withName(kubernetesAppId)
.watch(loggingWatch)) { _ =>
val (driverPod, driverService) = launchDriverKubernetesComponents(
kubernetesClient,
parsedCustomLabels,
submitServerSecret,
driverSubmitSslOptions,
sslSecrets,
sslVolumes,
sslVolumeMounts,
sslEnvs,
isKeyStoreLocalFile)
val ownerReferenceConfiguredDriverService = try {
configureOwnerReferences(
kubernetesClient,
submitServerSecret,
sslSecrets,
driverPod,
driverService)
} catch {
case e: Throwable =>
cleanupPodAndService(kubernetesClient, driverPod, driverService)
throw new SparkException("Failed to set owner references to the driver pod.", e)
}
try {
submitApplicationToDriverServer(kubernetesClient, driverSubmitSslOptions,
ownerReferenceConfiguredDriverService, submitterLocalFiles, submitterLocalJars)
// wait if configured to do so
if (waitForAppCompletion) {
logInfo(s"Waiting for application $kubernetesAppId to finish...")
driverPodCompletedLatch.await()
logInfo(s"Application $kubernetesAppId finished.")
} else {
logInfo(s"Application $kubernetesAppId successfully launched.")
}
} catch {
case e: Throwable =>
cleanupPodAndService(kubernetesClient, driverPod,
ownerReferenceConfiguredDriverService)
throw new SparkException("Failed to submit the application to the driver pod.", e)
}
}
} finally {
Utils.tryLogNonFatalError {
// Secrets may have been mutated so delete by name to avoid problems with not having
// the latest version.
sslSecrets.foreach { secret =>
kubernetesClient.secrets().withName(secret.getMetadata.getName).delete()
}
// start outer watch for status logging of driver pod
val driverPodCompletedLatch = new CountDownLatch(1)
// only enable interval logging if in waitForAppCompletion mode
val loggingInterval = if (waitForAppCompletion) sparkConf.get(REPORT_INTERVAL) else 0
val loggingWatch = new LoggingPodStatusWatcher(driverPodCompletedLatch, kubernetesAppId,
loggingInterval)
Utils.tryWithResource(kubernetesClient
.pods()
.withName(kubernetesAppId)
.watch(loggingWatch)) { _ =>
val (driverPod, driverService) = launchDriverKubernetesComponents(
kubernetesClient,
parsedCustomLabels,
submitServerSecret,
driverSubmitSslOptions,
sslSecrets,
sslVolumes,
sslVolumeMounts,
sslEnvs,
isKeyStoreLocalFile)
configureOwnerReferences(
kubernetesClient,
submitServerSecret,
sslSecrets,
driverPod,
driverService)
submitApplicationToDriverServer(
kubernetesClient,
driverSubmitSslOptions,
driverService,
submitterLocalFiles,
submitterLocalJars)
// Now that the application has started, persist the components that were created beyond
// the shutdown hook. We still want to purge the one-time secrets, so do not unregister
// those.
kubernetesResourceCleaner.unregisterResource(driverPod)
kubernetesResourceCleaner.unregisterResource(driverService)
// wait if configured to do so
if (waitForAppCompletion) {
logInfo(s"Waiting for application $kubernetesAppId to finish...")
driverPodCompletedLatch.await()
logInfo(s"Application $kubernetesAppId finished.")
} else {
logInfo(s"Application $kubernetesAppId successfully launched.")
}
}
} finally {
Utils.tryLogNonFatalError {
kubernetesClient.secrets().withName(submitServerSecret.getMetadata.getName).delete()
}
kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient)
}
}
}

private def cleanupPodAndService(
kubernetesClient: KubernetesClient,
driverPod: Pod,
driverService: Service): Unit = {
Utils.tryLogNonFatalError {
kubernetesClient.services().delete(driverService)
}
Utils.tryLogNonFatalError {
kubernetesClient.pods().delete(driverPod)
}
}

private def submitApplicationToDriverServer(
kubernetesClient: KubernetesClient,
driverSubmitSslOptions: SSLOptions,
Expand Down Expand Up @@ -237,11 +215,13 @@ private[spark] class Client(
.withPort(uiPort)
.withNewTargetPort(uiPort)
.build()
kubernetesClient.services().withName(kubernetesAppId).edit().editSpec()
.withType(uiServiceType)
.withPorts(uiServicePort)
.endSpec()
val resolvedService = kubernetesClient.services().withName(kubernetesAppId).edit()
.editSpec()
.withType(uiServiceType)
.withPorts(uiServicePort)
.endSpec()
.done()
kubernetesResourceCleaner.registerOrUpdateResource(resolvedService)
logInfo("Finished submitting application to Kubernetes.")
}

Expand Down Expand Up @@ -282,37 +262,19 @@ private[spark] class Client(
kubernetesClient,
driverKubernetesSelectors,
submitServerSecret)
val driverPod = try {
createDriverPod(
kubernetesClient,
driverKubernetesSelectors,
submitServerSecret,
driverSubmitSslOptions,
sslVolumes,
sslVolumeMounts,
sslEnvs)
} catch {
case e: Throwable =>
Utils.tryLogNonFatalError {
kubernetesClient.services().delete(driverService)
}
throw new SparkException("Failed to create the driver pod.", e)
}
try {
waitForReadyKubernetesComponents(kubernetesClient, endpointsReadyFuture,
serviceReadyFuture, podReadyFuture)
(driverPod, driverService)
} catch {
case e: Throwable =>
Utils.tryLogNonFatalError {
kubernetesClient.services().delete(driverService)
}
Utils.tryLogNonFatalError {
kubernetesClient.pods().delete(driverPod)
}
throw new SparkException("Timed out while waiting for a Kubernetes component to be" +
" ready.", e)
}
kubernetesResourceCleaner.registerOrUpdateResource(driverService)
val driverPod = createDriverPod(
kubernetesClient,
driverKubernetesSelectors,
submitServerSecret,
driverSubmitSslOptions,
sslVolumes,
sslVolumeMounts,
sslEnvs)
kubernetesResourceCleaner.registerOrUpdateResource(driverPod)
waitForReadyKubernetesComponents(kubernetesClient, endpointsReadyFuture,
serviceReadyFuture, podReadyFuture)
(driverPod, driverService)
}
}
}
Expand All @@ -338,22 +300,32 @@ private[spark] class Client(
.withController(true)
.build()
sslSecrets.foreach(secret => {
kubernetesClient.secrets().withName(secret.getMetadata.getName).edit()
val updatedSecret = kubernetesClient.secrets().withName(secret.getMetadata.getName).edit()
.editMetadata()
.addToOwnerReferences(driverPodOwnerRef)
.endMetadata()
.done()
kubernetesResourceCleaner.registerOrUpdateResource(updatedSecret)
})
kubernetesClient.secrets().withName(submitServerSecret.getMetadata.getName).edit()
.editMetadata()
.addToOwnerReferences(driverPodOwnerRef)
.endMetadata()
.done()
kubernetesClient.services().withName(driverService.getMetadata.getName).edit()
.editMetadata()
.addToOwnerReferences(driverPodOwnerRef)
.endMetadata()
.done()
val updatedSubmitServerSecret = kubernetesClient
.secrets()
.withName(submitServerSecret.getMetadata.getName)
.edit()
.editMetadata()
.addToOwnerReferences(driverPodOwnerRef)
.endMetadata()
.done()
kubernetesResourceCleaner.registerOrUpdateResource(updatedSubmitServerSecret)
val updatedService = kubernetesClient
.services()
.withName(driverService.getMetadata.getName)
.edit()
.editMetadata()
.addToOwnerReferences(driverPodOwnerRef)
.endMetadata()
.done()
kubernetesResourceCleaner.registerOrUpdateResource(updatedService)
updatedService
}

private def waitForReadyKubernetesComponents(
Expand Down Expand Up @@ -417,7 +389,7 @@ private[spark] class Client(
driverSubmitSslOptions: SSLOptions,
sslVolumes: Array[Volume],
sslVolumeMounts: Array[VolumeMount],
sslEnvs: Array[EnvVar]) = {
sslEnvs: Array[EnvVar]): Pod = {
val containerPorts = buildContainerPorts()
val probePingHttpGet = new HTTPGetActionBuilder()
.withScheme(if (driverSubmitSslOptions.enabled) "HTTPS" else "HTTP")
Expand Down Expand Up @@ -537,9 +509,11 @@ private[spark] class Client(
(securityManager.getSSLOptions(KUBERNETES_SUBMIT_SSL_NAMESPACE), isLocalKeyStore)
}

private def configureSsl(kubernetesClient: KubernetesClient, driverSubmitSslOptions: SSLOptions,
isKeyStoreLocalFile: Boolean):
(Array[EnvVar], Array[Volume], Array[VolumeMount], Array[Secret]) = {
private def configureSsl(
kubernetesClient: KubernetesClient,
driverSubmitSslOptions: SSLOptions,
isKeyStoreLocalFile: Boolean):
(Array[EnvVar], Array[Volume], Array[VolumeMount], Array[Secret]) = {
if (driverSubmitSslOptions.enabled) {
val sslSecretsMap = mutable.HashMap[String, String]()
val sslEnvs = mutable.Buffer[EnvVar]()
Expand Down Expand Up @@ -606,6 +580,7 @@ private[spark] class Client(
.withData(sslSecretsMap.asJava)
.withType("Opaque")
.done()
kubernetesResourceCleaner.registerOrUpdateResource(sslSecrets)
secrets += sslSecrets
(sslEnvs.toArray, Array(sslVolume), Array(sslVolumeMount), secrets.toArray)
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

import io.fabric8.kubernetes.api.model.HasMetadata
import io.fabric8.kubernetes.client.KubernetesClient
import scala.collection.mutable

import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils

private[spark] class KubernetesResourceCleaner
extends Logging {

private val resources = mutable.HashMap.empty[(String, String), HasMetadata]

// Synchronized because deleteAllRegisteredResourcesFromKubernetes may be called from a
// shutdown hook
def registerOrUpdateResource(resource: HasMetadata): Unit = synchronized {
resources.put((resource.getMetadata.getName, resource.getKind), resource)
}

def unregisterResource(resource: HasMetadata): Unit = synchronized {
resources.remove((resource.getMetadata.getName, resource.getKind))
}

def deleteAllRegisteredResourcesFromKubernetes(kubernetesClient: KubernetesClient): Unit = {
synchronized {
logInfo(s"Deleting ${resources.size} registered Kubernetes resources:")
resources.values.foreach { resource =>
Utils.tryLogNonFatalError {
kubernetesClient.resource(resource).delete()
}
}
resources.clear()
}
}
}

0 comments on commit e8359ca

Please sign in to comment.