From 1eac84f2d4100ab9f6dd105b7003c478d5368b27 Mon Sep 17 00:00:00 2001 From: fwang12 Date: Fri, 17 Nov 2023 10:43:50 +0800 Subject: [PATCH] save --- .../org/apache/kyuubi/config/KyuubiConf.scala | 14 +++++--- .../KubernetesApplicationAuditLogger.scala | 5 +-- .../KubernetesApplicationOperation.scala | 36 ++++++++++--------- 3 files changed, 33 insertions(+), 22 deletions(-) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index f8f617bd166..387c8d6193d 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -1239,13 +1239,14 @@ object KyuubiConf { .booleanConf .createWithDefault(false) - val KUBERNETES_APPLICATION_STATE_FROM_CONTAINER: ConfigEntry[Boolean] = - buildConf("kyuubi.kubernetes.application.state.fromContainer") + val KUBERNETES_APPLICATION_STATE_SOURCE: ConfigEntry[String] = + buildConf("kyuubi.kubernetes.application.state.source") .doc("If set to true then the application state will be retrieved from the container " + "instead of the pod.") .version("1.8.1") - .booleanConf - .createWithDefault(false) + .stringConf + .checkValues(KubernetesApplicationStateSource) + .createWithDefault(KubernetesApplicationStateSource.POD.toString) val KUBERNETES_APPLICATION_STATE_CONTAINER: ConfigEntry[String] = buildConf("kyuubi.kubernetes.application.state.container") @@ -1254,6 +1255,11 @@ object KyuubiConf { .stringConf .createWithDefault("spark-kubernetes-driver") + object KubernetesApplicationStateSource extends Enumeration { + type KubernetesApplicationStateSource = Value + val POD, CONTAINER = Value + } + // /////////////////////////////////////////////////////////////////////////////////////////////// // SQL Engine Configuration // // /////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationAuditLogger.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationAuditLogger.scala index 2558479a4fb..dd2ef18dea6 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationAuditLogger.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationAuditLogger.scala @@ -20,6 +20,7 @@ package org.apache.kyuubi.engine import io.fabric8.kubernetes.api.model.Pod import org.apache.kyuubi.Logging +import org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource.KubernetesApplicationStateSource import org.apache.kyuubi.engine.KubernetesApplicationOperation.{toApplicationStateAndError, LABEL_KYUUBI_UNIQUE_KEY, SPARK_APP_ID_LABEL} object KubernetesApplicationAuditLogger extends Logging { @@ -30,7 +31,7 @@ object KubernetesApplicationAuditLogger extends Logging { def audit( kubernetesInfo: KubernetesInfo, pod: Pod, - appStateFromContainer: Boolean, + appStateFrom: KubernetesApplicationStateSource, appStateContainer: String): Unit = { val sb = AUDIT_BUFFER.get() sb.setLength(0) @@ -40,7 +41,7 @@ object KubernetesApplicationAuditLogger extends Logging { sb.append(s"pod=${pod.getMetadata.getName}").append("\t") sb.append(s"appId=${pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL)}").append("\t") val (appState, appError) = - toApplicationStateAndError(pod, appStateFromContainer, appStateContainer) + toApplicationStateAndError(pod, appStateFrom, appStateContainer) sb.append(s"appState=$appState").append("\t") sb.append(s"appError='${appError.getOrElse("")}'") info(sb.toString()) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala index d3207a444d6..fca04a07f43 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala @@ -30,6 +30,8 @@ import io.fabric8.kubernetes.client.informers.{ResourceEventHandler, SharedIndex import org.apache.kyuubi.{KyuubiException, Logging, Utils} import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource +import org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource.KubernetesApplicationStateSource import org.apache.kyuubi.engine.ApplicationState.{isTerminated, ApplicationState, FAILED, FINISHED, NOT_FOUND, PENDING, RUNNING, UNKNOWN} import org.apache.kyuubi.util.KubernetesUtils @@ -48,8 +50,10 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { kyuubiConf.get(KyuubiConf.KUBERNETES_CONTEXT_ALLOW_LIST) private def allowedNamespaces: Set[String] = kyuubiConf.get(KyuubiConf.KUBERNETES_NAMESPACE_ALLOW_LIST) - private def appStateFromContainer: Boolean = - kyuubiConf.get(KyuubiConf.KUBERNETES_APPLICATION_STATE_FROM_CONTAINER) + + private def appStateSource: KubernetesApplicationStateSource = + KubernetesApplicationStateSource.withName( + kyuubiConf.get(KyuubiConf.KUBERNETES_APPLICATION_STATE_SOURCE)) private def appStateContainer: String = kyuubiConf.get(KyuubiConf.KUBERNETES_APPLICATION_STATE_CONTAINER) @@ -245,7 +249,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { KubernetesApplicationAuditLogger.audit( kubernetesInfo, pod, - appStateFromContainer, + appStateSource, appStateContainer) } } @@ -253,14 +257,14 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { override def onUpdate(oldPod: Pod, newPod: Pod): Unit = { if (isSparkEnginePod(newPod)) { updateApplicationState(kubernetesInfo, newPod) - val appState = toApplicationState(newPod, appStateFromContainer, appStateContainer) + val appState = toApplicationState(newPod, appStateSource, appStateContainer) if (isTerminated(appState)) { markApplicationTerminated(newPod) } KubernetesApplicationAuditLogger.audit( kubernetesInfo, newPod, - appStateFromContainer, + appStateSource, appStateContainer) } } @@ -272,7 +276,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { KubernetesApplicationAuditLogger.audit( kubernetesInfo, pod, - appStateFromContainer, + appStateSource, appStateContainer) } } @@ -285,7 +289,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { private def updateApplicationState(kubernetesInfo: KubernetesInfo, pod: Pod): Unit = { val (appState, appError) = - toApplicationStateAndError(pod, appStateFromContainer, appStateContainer) + toApplicationStateAndError(pod, appStateSource, appStateContainer) debug(s"Driver Informer changes pod: ${pod.getMetadata.getName} to state: $appState") appInfoStore.put( pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY), @@ -301,7 +305,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { if (cleanupTerminatedAppInfoTrigger.getIfPresent(key) == null) { cleanupTerminatedAppInfoTrigger.put( key, - toApplicationState(pod, appStateFromContainer, appStateContainer)) + toApplicationState(pod, appStateSource, appStateContainer)) } } } @@ -316,20 +320,20 @@ object KubernetesApplicationOperation extends Logging { def toApplicationState( pod: Pod, - appStateFromContainer: Boolean, + appStateFrom: KubernetesApplicationStateSource, appStateContainer: String): ApplicationState = { - toApplicationStateAndError(pod, appStateFromContainer, appStateContainer)._1 + toApplicationStateAndError(pod, appStateFrom, appStateContainer)._1 } def toApplicationStateAndError( pod: Pod, - appStateFromContainer: Boolean, + appStateFrom: KubernetesApplicationStateSource, appStateContainer: String): (ApplicationState, Option[String]) = { - val containerStateToBuildAppState = if (appStateFromContainer) { - pod.getStatus.getContainerStatuses.asScala.find(_.getState == appStateContainer).map( - _.getState) - } else { - None + val containerStateToBuildAppState = appStateFrom match { + case KubernetesApplicationStateSource.CONTAINER => + pod.getStatus.getContainerStatuses.asScala + .find(_.getState == appStateContainer).map(_.getState) + case KubernetesApplicationStateSource.POD => None } val applicationState = containerStateToBuildAppState.map(containerStateToApplicationState) .getOrElse(podStateToApplicationState(pod.getStatus.getPhase))