diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 9f9b5655e26fc..2b2dad1cf13c7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -137,7 +137,7 @@ private[spark] class Client( // setup resources before pod creation val preKubernetesResources = resolvedDriverSpec.driverPreKubernetesResources try { - kubernetesClient.resourceList(preKubernetesResources: _*).createOrReplace() + kubernetesClient.resourceList(preKubernetesResources: _*).forceConflicts().serverSideApply() } catch { case NonFatal(e) => logError("Please check \"kubectl auth can-i create [resource]\" first." + @@ -161,7 +161,7 @@ private[spark] class Client( // Refresh all pre-resources' owner references try { addOwnerReference(createdDriverPod, preKubernetesResources) - kubernetesClient.resourceList(preKubernetesResources: _*).createOrReplace() + kubernetesClient.resourceList(preKubernetesResources: _*).forceConflicts().serverSideApply() } catch { case NonFatal(e) => kubernetesClient.pods().resource(createdDriverPod).delete() @@ -173,7 +173,7 @@ private[spark] class Client( try { val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) addOwnerReference(createdDriverPod, otherKubernetesResources) - kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() + kubernetesClient.resourceList(otherKubernetesResources: _*).forceConflicts().serverSideApply() } catch { case NonFatal(e) => kubernetesClient.pods().resource(createdDriverPod).delete() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala index 8c2be6c142d74..a813b3a876f87 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala @@ -181,6 +181,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { createdPodArgumentCaptor = ArgumentCaptor.forClass(classOf[Pod]) createdResourcesArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata]) when(podsWithNamespace.resource(fullExpectedPod())).thenReturn(namedPods) + when(resourceList.forceConflicts()).thenReturn(resourceList) + when(namedPods.serverSideApply()).thenReturn(podWithOwnerReference()) when(namedPods.create()).thenReturn(podWithOwnerReference()) when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch]) val sId = submissionId(kconf.namespace, POD_NAME) @@ -309,7 +311,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { when(podsWithNamespace.resource(fullExpectedPod(expectedKeyToPaths))) .thenReturn(namedPods) - when(namedPods.create()).thenReturn(podWithOwnerReference(expectedKeyToPaths)) + when(namedPods.forceConflicts()).thenReturn(namedPods) + when(namedPods.serverSideApply()).thenReturn(podWithOwnerReference(expectedKeyToPaths)) kconf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf, resourceNamePrefix = Some(KUBERNETES_RESOURCE_PREFIX))