Skip to content

Commit

Permalink
cherry-pick 3.3 commits (apache#616)
Browse files Browse the repository at this point in the history
* KE-39980 configMap Binds the KE pod

* minor fix dockerfile java version and host user (apache#510)

* KY-895 Spark driver pod cleanup when application is completed (apache#528)

* KY-895, Spark driver pod cleanup when application is completed (apache#358)

* KY-895, Spark driver pod cleanup when application is completed

* KY-895, release 3.1.1-kylin-4.x-r42-xuanwu

* KY-895, add config KUBERNETES_DELETE_DRIVER (apache#376)

Co-authored-by: Feng Zhu <[email protected]>

---------

Co-authored-by: Jiawei Li <[email protected]>
Co-authored-by: Zhiting Guo <[email protected]>
Co-authored-by: Feng Zhu <[email protected]>
  • Loading branch information
4 people authored Apr 10, 2023
1 parent 327b6a1 commit cbbea03
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,14 @@ private[spark] object Config extends Logging {
.checkValue(value => value > 0, "Maximum number of pending pods should be a positive integer")
.createWithDefault(Int.MaxValue)

val KUBERNETES_DELETE_DRIVER =
ConfigBuilder("spark.kubernetes.driver.deleteOnCompleted")
.doc("If set to false then driver pods will not be deleted in case " +
"of completion.")
.version("3.1.1")
.booleanConf
.createWithDefault(true)

val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
val KUBERNETES_DRIVER_SERVICE_ANNOTATION_PREFIX = "spark.kubernetes.driver.service.annotation."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
// The master URL has been checked for validity already in SparkSubmit.
// We just need to get rid of the "k8s://" prefix here.
val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))
val watcher = new LoggingPodStatusWatcherImpl(kubernetesConf)

Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
master,
Expand All @@ -216,7 +215,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
kubernetesConf,
new KubernetesDriverBuilder(),
kubernetesClient,
watcher)
new LoggingPodStatusWatcherImpl(kubernetesConf, kubernetesClient))
client.run()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.deploy.k8s.submit

import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.{Watcher, WatcherException}
import io.fabric8.kubernetes.client.{KubernetesClient, Watcher, WatcherException}
import io.fabric8.kubernetes.client.Watcher.Action

import org.apache.spark.deploy.k8s.Config._
Expand All @@ -36,7 +36,8 @@ private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
*
* @param conf kubernetes driver conf.
*/
private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf)
private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf,
kubernetesClient: KubernetesClient)
extends LoggingPodStatusWatcher with Logging {

private val appId = conf.appId
Expand Down Expand Up @@ -110,6 +111,9 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf)
pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" }
.getOrElse("No containers were found in the driver pod."))
logInfo(s"Application ${conf.appName} with submission ID $sId finished")
if (conf.get(KUBERNETES_DELETE_DRIVER)) {
pod.map { p => kubernetesClient.pods().withName(p.getMetadata.getName).delete() }
}
}
podCompleted
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.scheduler.cluster.k8s

import java.net.InetAddress
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}

import scala.concurrent.Future
Expand Down Expand Up @@ -76,10 +77,19 @@ private[spark] class KubernetesClusterSchedulerBackend(
val labels =
Map(SPARK_APP_ID_LABEL -> applicationId(), SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE)
val configMap = KubernetesClientUtils.buildConfigMap(configMapName, confFilesMap, labels)
KubernetesUtils.addOwnerReference(driverPod.orNull, Seq(configMap))
KubernetesUtils.addOwnerReference(getDriverPodOrLocalPod(driverPod), Seq(configMap))
kubernetesClient.configMaps().create(configMap)
}

private def getDriverPodOrLocalPod(driverPod: Option[Pod]): Pod = {
if (driverPod.isDefined) {
return driverPod.get
}
val podName = InetAddress.getLocalHost.getHostName
logInfo(s"LocalPod={podName=$podName}")
kubernetesClient.pods().withName(podName).get()
}

/**
* Get an application ID associated with the job.
* This returns the string value of spark.app.id if set, otherwise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
ARG java_image_tag=11-jre-slim
ARG java_image_tag=8-jdk-slim

FROM openjdk:${java_image_tag}

ARG spark_uid=185
ARG spark_uid=root

# Before building the docker image, first build and make a Spark distribution following
# the instructions in http://spark.apache.org/docs/latest/building-spark.html.
Expand Down

0 comments on commit cbbea03

Please sign in to comment.