Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
turboFei committed Nov 17, 2023
1 parent bca19be commit 1eac84f
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1239,13 +1239,14 @@ object KyuubiConf {
.booleanConf
.createWithDefault(false)

val KUBERNETES_APPLICATION_STATE_FROM_CONTAINER: ConfigEntry[Boolean] =
buildConf("kyuubi.kubernetes.application.state.fromContainer")
val KUBERNETES_APPLICATION_STATE_SOURCE: ConfigEntry[String] =
buildConf("kyuubi.kubernetes.application.state.source")
.doc("If set to true then the application state will be retrieved from the container " +
"instead of the pod.")
.version("1.8.1")
.booleanConf
.createWithDefault(false)
.stringConf
.checkValues(KubernetesApplicationStateSource)
.createWithDefault(KubernetesApplicationStateSource.POD.toString)

val KUBERNETES_APPLICATION_STATE_CONTAINER: ConfigEntry[String] =
buildConf("kyuubi.kubernetes.application.state.container")
Expand All @@ -1254,6 +1255,11 @@ object KyuubiConf {
.stringConf
.createWithDefault("spark-kubernetes-driver")

object KubernetesApplicationStateSource extends Enumeration {
type KubernetesApplicationStateSource = Value
val POD, CONTAINER = Value
}

// ///////////////////////////////////////////////////////////////////////////////////////////////
// SQL Engine Configuration //
// ///////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.kyuubi.engine
import io.fabric8.kubernetes.api.model.Pod

import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource.KubernetesApplicationStateSource
import org.apache.kyuubi.engine.KubernetesApplicationOperation.{toApplicationStateAndError, LABEL_KYUUBI_UNIQUE_KEY, SPARK_APP_ID_LABEL}

object KubernetesApplicationAuditLogger extends Logging {
Expand All @@ -30,7 +31,7 @@ object KubernetesApplicationAuditLogger extends Logging {
def audit(
kubernetesInfo: KubernetesInfo,
pod: Pod,
appStateFromContainer: Boolean,
appStateFrom: KubernetesApplicationStateSource,
appStateContainer: String): Unit = {
val sb = AUDIT_BUFFER.get()
sb.setLength(0)
Expand All @@ -40,7 +41,7 @@ object KubernetesApplicationAuditLogger extends Logging {
sb.append(s"pod=${pod.getMetadata.getName}").append("\t")
sb.append(s"appId=${pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL)}").append("\t")
val (appState, appError) =
toApplicationStateAndError(pod, appStateFromContainer, appStateContainer)
toApplicationStateAndError(pod, appStateFrom, appStateContainer)
sb.append(s"appState=$appState").append("\t")
sb.append(s"appError='${appError.getOrElse("")}'")
info(sb.toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import io.fabric8.kubernetes.client.informers.{ResourceEventHandler, SharedIndex

import org.apache.kyuubi.{KyuubiException, Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource
import org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource.KubernetesApplicationStateSource
import org.apache.kyuubi.engine.ApplicationState.{isTerminated, ApplicationState, FAILED, FINISHED, NOT_FOUND, PENDING, RUNNING, UNKNOWN}
import org.apache.kyuubi.util.KubernetesUtils

Expand All @@ -48,8 +50,10 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
kyuubiConf.get(KyuubiConf.KUBERNETES_CONTEXT_ALLOW_LIST)
private def allowedNamespaces: Set[String] =
kyuubiConf.get(KyuubiConf.KUBERNETES_NAMESPACE_ALLOW_LIST)
private def appStateFromContainer: Boolean =
kyuubiConf.get(KyuubiConf.KUBERNETES_APPLICATION_STATE_FROM_CONTAINER)

private def appStateSource: KubernetesApplicationStateSource =
KubernetesApplicationStateSource.withName(
kyuubiConf.get(KyuubiConf.KUBERNETES_APPLICATION_STATE_SOURCE))
private def appStateContainer: String =
kyuubiConf.get(KyuubiConf.KUBERNETES_APPLICATION_STATE_CONTAINER)

Expand Down Expand Up @@ -245,22 +249,22 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
KubernetesApplicationAuditLogger.audit(
kubernetesInfo,
pod,
appStateFromContainer,
appStateSource,
appStateContainer)
}
}

override def onUpdate(oldPod: Pod, newPod: Pod): Unit = {
if (isSparkEnginePod(newPod)) {
updateApplicationState(kubernetesInfo, newPod)
val appState = toApplicationState(newPod, appStateFromContainer, appStateContainer)
val appState = toApplicationState(newPod, appStateSource, appStateContainer)
if (isTerminated(appState)) {
markApplicationTerminated(newPod)
}
KubernetesApplicationAuditLogger.audit(
kubernetesInfo,
newPod,
appStateFromContainer,
appStateSource,
appStateContainer)
}
}
Expand All @@ -272,7 +276,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
KubernetesApplicationAuditLogger.audit(
kubernetesInfo,
pod,
appStateFromContainer,
appStateSource,
appStateContainer)
}
}
Expand All @@ -285,7 +289,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {

private def updateApplicationState(kubernetesInfo: KubernetesInfo, pod: Pod): Unit = {
val (appState, appError) =
toApplicationStateAndError(pod, appStateFromContainer, appStateContainer)
toApplicationStateAndError(pod, appStateSource, appStateContainer)
debug(s"Driver Informer changes pod: ${pod.getMetadata.getName} to state: $appState")
appInfoStore.put(
pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY),
Expand All @@ -301,7 +305,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
if (cleanupTerminatedAppInfoTrigger.getIfPresent(key) == null) {
cleanupTerminatedAppInfoTrigger.put(
key,
toApplicationState(pod, appStateFromContainer, appStateContainer))
toApplicationState(pod, appStateSource, appStateContainer))
}
}
}
Expand All @@ -316,20 +320,20 @@ object KubernetesApplicationOperation extends Logging {

def toApplicationState(
pod: Pod,
appStateFromContainer: Boolean,
appStateFrom: KubernetesApplicationStateSource,
appStateContainer: String): ApplicationState = {
toApplicationStateAndError(pod, appStateFromContainer, appStateContainer)._1
toApplicationStateAndError(pod, appStateFrom, appStateContainer)._1
}

def toApplicationStateAndError(
pod: Pod,
appStateFromContainer: Boolean,
appStateFrom: KubernetesApplicationStateSource,
appStateContainer: String): (ApplicationState, Option[String]) = {
val containerStateToBuildAppState = if (appStateFromContainer) {
pod.getStatus.getContainerStatuses.asScala.find(_.getState == appStateContainer).map(
_.getState)
} else {
None
val containerStateToBuildAppState = appStateFrom match {
case KubernetesApplicationStateSource.CONTAINER =>
pod.getStatus.getContainerStatuses.asScala
.find(_.getState == appStateContainer).map(_.getState)
case KubernetesApplicationStateSource.POD => None
}
val applicationState = containerStateToBuildAppState.map(containerStateToApplicationState)
.getOrElse(podStateToApplicationState(pod.getStatus.getPhase))
Expand Down

0 comments on commit 1eac84f

Please sign in to comment.