From 1e197b5bc2258f9c0657cf9d792005c540ccb7f4 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Tue, 6 Jun 2023 19:06:49 -0700 Subject: [PATCH] [SPARK-43356][K8S] Migrate deprecated createOrReplace to serverSideApply ### What changes were proposed in this pull request? The deprecation message of `createOrReplace` indicates that we should change `createOrReplace` to `serverSideApply` instead. ``` deprecated please use {link ServerSideApplicable#serverSideApply()} or attempt a create and edit/patch operation. ``` The change is not fully equivalent, but I believe it's reasonable. > With the caveat that the user may choose not to use forcing if they want to know when there are conflicting changes. > > Also unlike createOrReplace if the resourceVersion is set on the resource and a replace is attempted, it will be optimistically locked. See more details at https://github.com/fabric8io/kubernetes-client/pull/5073 ### Why are the changes needed? Remove usage of deprecated API. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. Closes #41136 from pan3793/SPARK-43356. Authored-by: Cheng Pan Signed-off-by: Dongjoon Hyun --- .../deploy/k8s/submit/KubernetesClientApplication.scala | 6 +++--- .../org/apache/spark/deploy/k8s/submit/ClientSuite.scala | 5 ++++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 9f9b5655e26fc..2b2dad1cf13c7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -137,7 +137,7 @@ private[spark] class Client( // setup resources before pod creation val preKubernetesResources = resolvedDriverSpec.driverPreKubernetesResources try { - kubernetesClient.resourceList(preKubernetesResources: _*).createOrReplace() + kubernetesClient.resourceList(preKubernetesResources: _*).forceConflicts().serverSideApply() } catch { case NonFatal(e) => logError("Please check \"kubectl auth can-i create [resource]\" first." + @@ -161,7 +161,7 @@ private[spark] class Client( // Refresh all pre-resources' owner references try { addOwnerReference(createdDriverPod, preKubernetesResources) - kubernetesClient.resourceList(preKubernetesResources: _*).createOrReplace() + kubernetesClient.resourceList(preKubernetesResources: _*).forceConflicts().serverSideApply() } catch { case NonFatal(e) => kubernetesClient.pods().resource(createdDriverPod).delete() @@ -173,7 +173,7 @@ private[spark] class Client( try { val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) addOwnerReference(createdDriverPod, otherKubernetesResources) - kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() + kubernetesClient.resourceList(otherKubernetesResources: _*).forceConflicts().serverSideApply() } catch { case NonFatal(e) => kubernetesClient.pods().resource(createdDriverPod).delete() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala index 8c2be6c142d74..a813b3a876f87 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala @@ -181,6 +181,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { createdPodArgumentCaptor = ArgumentCaptor.forClass(classOf[Pod]) createdResourcesArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata]) when(podsWithNamespace.resource(fullExpectedPod())).thenReturn(namedPods) + when(resourceList.forceConflicts()).thenReturn(resourceList) + when(namedPods.serverSideApply()).thenReturn(podWithOwnerReference()) when(namedPods.create()).thenReturn(podWithOwnerReference()) when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch]) val sId = submissionId(kconf.namespace, POD_NAME) @@ -309,7 +311,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { when(podsWithNamespace.resource(fullExpectedPod(expectedKeyToPaths))) .thenReturn(namedPods) - when(namedPods.create()).thenReturn(podWithOwnerReference(expectedKeyToPaths)) + when(namedPods.forceConflicts()).thenReturn(namedPods) + when(namedPods.serverSideApply()).thenReturn(podWithOwnerReference(expectedKeyToPaths)) kconf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf, resourceNamePrefix = Some(KUBERNETES_RESOURCE_PREFIX))