From 1f44bdca4bd4800fc1d1932901039ac08336f0d4 Mon Sep 17 00:00:00 2001 From: Yicheng-Lu-llll <51814063+Yicheng-Lu-llll@users.noreply.github.com> Date: Sat, 13 Apr 2024 06:26:38 +0800 Subject: [PATCH] [RayJob] Add Tests for Atomic Suspend Operation (#2050) --- .../ray/rayjob_controller_suspended_test.go | 289 +++++++----------- .../controllers/ray/rayjob_controller_test.go | 29 +- .../ray/rayservice_controller_test.go | 10 +- .../controllers/ray/suite_helpers_test.go | 58 +++- 4 files changed, 167 insertions(+), 219 deletions(-) diff --git a/ray-operator/controllers/ray/rayjob_controller_suspended_test.go b/ray-operator/controllers/ray/rayjob_controller_suspended_test.go index b1d076170c..6a2379b178 100644 --- a/ray-operator/controllers/ray/rayjob_controller_suspended_test.go +++ b/ray-operator/controllers/ray/rayjob_controller_suspended_test.go @@ -17,242 +17,177 @@ package ray import ( "context" - "fmt" "time" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/resource" - - "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" - . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" - - corev1 "k8s.io/api/core/v1" - "k8s.io/utils/pointer" - + "github.com/ray-project/kuberay/ray-operator/controllers/ray/common" batchv1 "k8s.io/api/batch/v1" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "sigs.k8s.io/controller-runtime/pkg/client" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/util/retry" ) -var _ = Context("Inside the default namespace", func() { - ctx := context.TODO() - var workerPods corev1.PodList - var headPods corev1.PodList - - mySuspendedRayCluster := &rayv1.RayCluster{} - - mySuspendedRayJob := &rayv1.RayJob{ - ObjectMeta: metav1.ObjectMeta{ - Name: "rayjob-test-suspend", - Namespace: "default", - }, - Spec: rayv1.RayJobSpec{ - ShutdownAfterJobFinishes: true, - Suspend: true, - Entrypoint: "sleep 999", - RayClusterSpec: &rayv1.RayClusterSpec{ - RayVersion: "2.9.0", - HeadGroupSpec: rayv1.HeadGroupSpec{ - RayStartParams: map[string]string{}, - Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "ray-head", - Image: "rayproject/ray:2.8.0", - Resources: corev1.ResourceRequirements{ - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1"), - corev1.ResourceMemory: resource.MustParse("2Gi"), - }, - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1"), - corev1.ResourceMemory: resource.MustParse("2Gi"), - }, - }, - Ports: []corev1.ContainerPort{ - { - Name: "gcs-server", - ContainerPort: 6379, - }, - { - Name: "dashboard", - ContainerPort: 8265, - }, - { - Name: "head", - ContainerPort: 10001, - }, - { - Name: "dashboard-agent", - ContainerPort: 52365, - }, - }, - }, - }, - }, - }, - }, - WorkerGroupSpecs: []rayv1.WorkerGroupSpec{ - { - Replicas: pointer.Int32(3), - MinReplicas: pointer.Int32(0), - MaxReplicas: pointer.Int32(10000), - GroupName: "small-group", - RayStartParams: map[string]string{}, - Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "ray-worker", - Image: "rayproject/ray:2.8.0", - }, - }, - }, - }, - }, - }, - }, - }, - } - +var _ = Context("RayJob with suspend operation", func() { Describe("When creating a rayjob with suspend == true", func() { + ctx := context.Background() + namespace := "default" + rayCluster := &rayv1.RayCluster{} + rayJob := rayJobTemplate("rayjob-suspend", namespace) + rayJob.Spec.Suspend = true + It("should create a rayjob object", func() { - err := k8sClient.Create(ctx, mySuspendedRayJob) + err := k8sClient.Create(ctx, rayJob) Expect(err).NotTo(HaveOccurred(), "failed to create test RayJob resource") }) - It("should see a rayjob object", func() { - Eventually( - getResourceFunc(ctx, client.ObjectKey{Name: mySuspendedRayJob.Name, Namespace: "default"}, mySuspendedRayJob), - time.Second*3, time.Millisecond*500).Should(BeNil(), "My myRayJob = %v", mySuspendedRayJob.Name) - }) - It("should have deployment status suspended", func() { Eventually( - getRayJobDeploymentStatus(ctx, mySuspendedRayJob), + getRayJobDeploymentStatus(ctx, rayJob), time.Second*5, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusSuspended)) }) It("should NOT create a raycluster object", func() { Consistently( - getRayClusterNameForRayJob(ctx, mySuspendedRayJob), + getRayClusterNameForRayJob(ctx, rayJob), time.Second*3, time.Millisecond*500).Should(BeEmpty()) }) It("should unsuspend a rayjob object", func() { - mySuspendedRayJob.Spec.Suspend = false - err := k8sClient.Update(ctx, mySuspendedRayJob) - Expect(err).NotTo(HaveOccurred(), "failed to update test RayJob resource") + err := updateRayJobSuspendField(ctx, rayJob, false) + Expect(err).NotTo(HaveOccurred(), "failed to update RayJob") }) It("should create a raycluster object", func() { // Ray Cluster name can be present on RayJob's CRD Eventually( - getRayClusterNameForRayJob(ctx, mySuspendedRayJob), + getRayClusterNameForRayJob(ctx, rayJob), time.Second*15, time.Millisecond*500).Should(Not(BeEmpty())) // The actual cluster instance and underlying resources SHOULD be created when suspend == false Eventually( // k8sClient client does not throw error if cluster IS found - getResourceFunc(ctx, client.ObjectKey{Name: mySuspendedRayJob.Status.RayClusterName, Namespace: "default"}, mySuspendedRayCluster), + getResourceFunc(ctx, common.RayJobRayClusterNamespacedName(rayJob), rayCluster), time.Second*3, time.Millisecond*500).Should(BeNil()) }) - It("should create 3 workers", func() { - Eventually( - listResourceFunc(ctx, &workerPods, client.MatchingLabels{ - utils.RayClusterLabelKey: mySuspendedRayCluster.Name, - utils.RayNodeGroupLabelKey: "small-group", + It("should NOT create the underlying K8s job yet because the cluster is not ready", func() { + underlyingK8sJob := &batchv1.Job{} + Consistently( + // k8sClient client throws error if resource not found + func() bool { + err := getResourceFunc(ctx, common.RayJobK8sJobNamespacedName(rayJob), underlyingK8sJob)() + return errors.IsNotFound(err) }, - &client.ListOptions{Namespace: "default"}), - time.Second*15, time.Millisecond*500).Should(Equal(3), fmt.Sprintf("workerGroup %v", workerPods.Items)) - if len(workerPods.Items) > 0 { - Expect(workerPods.Items[0].Status.Phase).Should(Or(Equal(corev1.PodRunning), Equal(corev1.PodPending))) - } + time.Second*3, time.Millisecond*500).Should(BeTrue()) }) - It("should create a head pod resource", func() { - err := k8sClient.List(ctx, &headPods, - client.MatchingLabels{ - utils.RayClusterLabelKey: mySuspendedRayCluster.Name, - utils.RayNodeGroupLabelKey: utils.RayNodeHeadGroupLabelValue, - }, - &client.ListOptions{Namespace: "default"}, - client.InNamespace(mySuspendedRayCluster.Namespace)) - - Expect(err).NotTo(HaveOccurred(), "failed list head pods") - Expect(len(headPods.Items)).Should(BeNumerically("==", 1), "My head pod list= %v", headPods.Items) + It("should be able to update all Pods to Running", func() { + updateHeadPodToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace) + updateWorkerPodsToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace) + }) - pod := &corev1.Pod{} - if len(headPods.Items) > 0 { - pod = &headPods.Items[0] - } + It("Dashboard URL should be set", func() { Eventually( - getResourceFunc(ctx, client.ObjectKey{Name: pod.Name, Namespace: "default"}, pod), - time.Second*3, time.Millisecond*500).Should(BeNil(), "My head pod = %v", pod) - Expect(pod.Status.Phase).Should(Or(Equal(corev1.PodPending))) + getDashboardURLForRayJob(ctx, rayJob), + time.Second*3, time.Millisecond*500).Should(HavePrefix(rayJob.Name), "Dashboard URL = %v", rayJob.Status.DashboardURL) }) - It("should NOT create the underlying K8s job yet because the cluster is not ready", func() { + It("should create the underlying Kubernetes Job object", func() { underlyingK8sJob := &batchv1.Job{} + // The underlying Kubernetes Job should be created when the RayJob is created Eventually( - // k8sClient client throws error if resource not found - func() bool { - err := getResourceFunc(ctx, client.ObjectKey{Name: mySuspendedRayJob.Name, Namespace: "default"}, underlyingK8sJob)() - return errors.IsNotFound(err) - }, - time.Second*10, time.Millisecond*500).Should(BeTrue()) + getResourceFunc(ctx, common.RayJobK8sJobNamespacedName(rayJob), underlyingK8sJob), + time.Second*3, time.Millisecond*500).Should(BeNil(), "Expected Kubernetes job to be present") }) + }) - It("should be able to update all Pods to Running", func() { - // We need to manually update Pod statuses otherwise they'll always be Pending. - // envtest doesn't create a full K8s cluster. It's only the control plane. - // There's no container runtime or any other K8s controllers. - // So Pods are created, but no controller updates them from Pending to Running. - // See https://book.kubebuilder.io/reference/envtest.html + Describe("RayJob suspend operation shoud be atomic", func() { + ctx := context.Background() + namespace := "default" + rayJob := rayJobTemplate("rayjob-atomic-suspend", namespace) + rayCluster := &rayv1.RayCluster{} - for _, headPod := range headPods.Items { - headPod.Status.Phase = corev1.PodRunning - Expect(k8sClient.Status().Update(ctx, &headPod)).Should(BeNil()) - } + It("Create a RayJob custom resource", func() { + err := k8sClient.Create(ctx, rayJob) + Expect(err).NotTo(HaveOccurred(), "Failed to create RayJob") + }) + It("RayJobs's JobDeploymentStatus transitions from New to Initializing.", func() { Eventually( - isAllPodsRunning(ctx, headPods, client.MatchingLabels{ - utils.RayClusterLabelKey: mySuspendedRayCluster.Name, - utils.RayNodeGroupLabelKey: utils.RayNodeHeadGroupLabelValue, - }, "default"), - time.Second*15, time.Millisecond*500).Should(Equal(true), "Head Pod should be running.") + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusInitializing), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) + }) - for _, workerPod := range workerPods.Items { - workerPod.Status.Phase = corev1.PodRunning - Expect(k8sClient.Status().Update(ctx, &workerPod)).Should(BeNil()) - } + It("Make RayCluster.Status.State to be rayv1.Ready", func() { + updateHeadPodToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace) + updateWorkerPodsToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace) + Eventually( + getClusterState(ctx, namespace, rayJob.Status.RayClusterName), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.Ready)) + }) + It("RayJobs's JobDeploymentStatus transitions from Initializing to Running.", func() { Eventually( - isAllPodsRunning(ctx, workerPods, client.MatchingLabels{utils.RayClusterLabelKey: mySuspendedRayCluster.Name, utils.RayNodeGroupLabelKey: "small-group"}, "default"), - time.Second*15, time.Millisecond*500).Should(Equal(true), "All worker Pods should be running.") + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusRunning), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) }) - It("Dashboard URL should be set", func() { + // The finalizer here is used to prevent the RayCluster from being deleted, + // ensuring that the RayJob remains in Suspending status once the suspend field is set to true. + It("Add finalizer to the RayCluster", func() { + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + err := k8sClient.Get(ctx, common.RayJobRayClusterNamespacedName(rayJob), rayCluster) + if err != nil { + return err + } + rayCluster.Finalizers = append(rayCluster.Finalizers, "ray.io/deletion-blocker") + return k8sClient.Update(ctx, rayCluster) + }) + Expect(err).NotTo(HaveOccurred(), "failed to add finalizer to RayCluster") + }) + + It("Suspend the RayJob", func() { + err := updateRayJobSuspendField(ctx, rayJob, true) + Expect(err).NotTo(HaveOccurred(), "failed to update RayJob") + }) + + It("RayJobs's JobDeploymentStatus transitions from Running to Suspending.", func() { Eventually( - getDashboardURLForRayJob(ctx, mySuspendedRayJob), - time.Second*3, time.Millisecond*500).Should(HavePrefix(mySuspendedRayJob.Name), "Dashboard URL = %v", mySuspendedRayJob.Status.DashboardURL) + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusSuspending), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) }) - It("should create the underlying Kubernetes Job object", func() { - underlyingK8sJob := &batchv1.Job{} - // The underlying Kubernetes Job should be created when the RayJob is created + // The suspend operation is atomic; regardless of how the user sets the suspend field at this moment, the status should be Suspending. + It("Change the suspend field of RayJob from true to false and then back to true.", func() { + err := updateRayJobSuspendField(ctx, rayJob, false) + Expect(err).NotTo(HaveOccurred(), "failed to update RayJob") + Consistently( + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusSuspending), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) + + err = updateRayJobSuspendField(ctx, rayJob, true) + Expect(err).NotTo(HaveOccurred(), "failed to update RayJob") + Consistently( + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusSuspending), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) + }) + + It("Remove finalizer from the RayCluster", func() { + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + err := k8sClient.Get(ctx, common.RayJobRayClusterNamespacedName(rayJob), rayCluster) + if err != nil { + return err + } + rayCluster.Finalizers = []string{} + return k8sClient.Update(ctx, rayCluster) + }) + Expect(err).NotTo(HaveOccurred(), "failed to remove finalizer from RayCluster") + }) + + It("RayJobs's JobDeploymentStatus transitions from Suspending to Suspended.", func() { Eventually( - // k8sClient does not throw error if Job is found - func() error { - return getResourceFunc(ctx, client.ObjectKey{Name: mySuspendedRayJob.Name, Namespace: "default"}, underlyingK8sJob)() - }, - time.Second*15, time.Millisecond*500).Should(BeNil(), "Expected Kubernetes job to be present") + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusSuspended), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) }) }) }) diff --git a/ray-operator/controllers/ray/rayjob_controller_test.go b/ray-operator/controllers/ray/rayjob_controller_test.go index dfea31e9ec..d2632f5d78 100644 --- a/ray-operator/controllers/ray/rayjob_controller_test.go +++ b/ray-operator/controllers/ray/rayjob_controller_test.go @@ -17,7 +17,6 @@ package ray import ( "context" - "fmt" "time" "k8s.io/apimachinery/pkg/api/resource" @@ -167,33 +166,9 @@ var _ = Context("RayJob in K8sJobMode", func() { It("Make RayCluster.Status.State to be rayv1.Ready", func() { // The RayCluster is not 'Ready' yet because Pods are not running and ready. Expect(rayCluster.Status.State).NotTo(Equal(rayv1.Ready)) - allPods := []corev1.Pod{} - // Check whether the number of worker Pods is consistent with RayCluster CR or not. - numWorkerPods := int(*rayCluster.Spec.WorkerGroupSpecs[0].Replicas) - workerFilterLabels := client.MatchingLabels{utils.RayClusterLabelKey: rayCluster.Name, utils.RayNodeGroupLabelKey: rayCluster.Spec.WorkerGroupSpecs[0].GroupName} - workerPods := corev1.PodList{} - Eventually( - listResourceFunc(ctx, &workerPods, workerFilterLabels, &client.ListOptions{Namespace: namespace}), - time.Second*3, time.Millisecond*500).Should(Equal(int(numWorkerPods)), fmt.Sprintf("workerGroup: %v", workerPods.Items)) - - // The number of head Pods should be 1. - headPods := corev1.PodList{} - headFilterLabels := client.MatchingLabels{utils.RayClusterLabelKey: rayCluster.Name, utils.RayNodeGroupLabelKey: utils.RayNodeHeadGroupLabelValue} - Eventually( - listResourceFunc(ctx, &headPods, headFilterLabels, &client.ListOptions{Namespace: namespace}), - time.Second*3, time.Millisecond*500).Should(Equal(1), fmt.Sprintf("head Pod: %v", headPods.Items)) - - // Update all Pods, including head and worker Pods, to Running and PodReady. - allPods = append(allPods, headPods.Items...) - allPods = append(allPods, workerPods.Items...) - - for _, pod := range allPods { - pod.Status.Phase = corev1.PodRunning - // In envtest, if Pod.Status.Phase is set to running, the Pod's PodReady condition becomes true automatically. - // Check https://github.com/ray-project/kuberay/issues/1736 for more details. - Expect(k8sClient.Status().Update(ctx, &pod)).Should(BeNil()) - } + updateHeadPodToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace) + updateWorkerPodsToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace) // The RayCluster.Status.State should be Ready. Eventually( diff --git a/ray-operator/controllers/ray/rayservice_controller_test.go b/ray-operator/controllers/ray/rayservice_controller_test.go index 8bd4eb3258..90172ca4c1 100644 --- a/ray-operator/controllers/ray/rayservice_controller_test.go +++ b/ray-operator/controllers/ray/rayservice_controller_test.go @@ -233,7 +233,7 @@ var _ = Context("Inside the default namespace", func() { pendingRayClusterName := myRayService.Status.PendingServiceStatus.RayClusterName // Update the status of the head Pod to Running. - updateHeadPodToRunningAndReady(ctx, pendingRayClusterName) + updateHeadPodToRunningAndReady(ctx, pendingRayClusterName, "default") // Make sure the pending RayCluster becomes the active RayCluster. Eventually( @@ -304,7 +304,7 @@ var _ = Context("Inside the default namespace", func() { pendingRayClusterName := myRayService.Status.PendingServiceStatus.RayClusterName // Update the status of the head Pod to Running. - updateHeadPodToRunningAndReady(ctx, pendingRayClusterName) + updateHeadPodToRunningAndReady(ctx, pendingRayClusterName, "default") // Confirm switch to a new Ray Cluster. Eventually( @@ -424,7 +424,7 @@ var _ = Context("Inside the default namespace", func() { // The pending RayCluster will become the active RayCluster after: // (1) The pending RayCluster's head Pod becomes Running and Ready // (2) The pending RayCluster's Serve Deployments are HEALTHY. - updateHeadPodToRunningAndReady(ctx, initialPendingClusterName) + updateHeadPodToRunningAndReady(ctx, initialPendingClusterName, "default") healthyStatus := generateServeStatus(rayv1.DeploymentStatusEnum.HEALTHY, rayv1.ApplicationStatusEnum.RUNNING) fakeRayDashboardClient.SetMultiApplicationStatuses(map[string]*utils.ServeApplicationStatus{testServeAppName: &healthyStatus}) Eventually( @@ -521,7 +521,7 @@ var _ = Context("Inside the default namespace", func() { // The pending RayCluster will become the active RayCluster after: // (1) The pending RayCluster's head Pod becomes Running and Ready // (2) The pending RayCluster's Serve Deployments are HEALTHY. - updateHeadPodToRunningAndReady(ctx, initialPendingClusterName) + updateHeadPodToRunningAndReady(ctx, initialPendingClusterName, "default") healthyStatus := generateServeStatus(rayv1.DeploymentStatusEnum.HEALTHY, rayv1.ApplicationStatusEnum.RUNNING) fakeRayDashboardClient.SetMultiApplicationStatuses(map[string]*utils.ServeApplicationStatus{testServeAppName: &healthyStatus}) Eventually( @@ -667,7 +667,7 @@ var _ = Context("Inside the default namespace", func() { // The cluster should switch once the deployments are finished updating healthyStatus := generateServeStatus(rayv1.DeploymentStatusEnum.HEALTHY, rayv1.ApplicationStatusEnum.RUNNING) fakeRayDashboardClient.SetMultiApplicationStatuses(map[string]*utils.ServeApplicationStatus{testServeAppName: &healthyStatus}) - updateHeadPodToRunningAndReady(ctx, pendingRayClusterName) + updateHeadPodToRunningAndReady(ctx, pendingRayClusterName, "default") Eventually( getRayClusterNameFunc(ctx, myRayService), diff --git a/ray-operator/controllers/ray/suite_helpers_test.go b/ray-operator/controllers/ray/suite_helpers_test.go index e3c31b22da..f20ba90b3e 100644 --- a/ray-operator/controllers/ray/suite_helpers_test.go +++ b/ray-operator/controllers/ray/suite_helpers_test.go @@ -49,10 +49,6 @@ func getClusterState(ctx context.Context, namespace string, clusterName string) } } -func isAllPodsRunning(ctx context.Context, podlist corev1.PodList, filterLabels client.MatchingLabels, namespace string) bool { - return isAllPodsRunningByFilters(ctx, podlist, filterLabels, &client.ListOptions{Namespace: namespace}) -} - func isAllPodsRunningByFilters(ctx context.Context, podlist corev1.PodList, opt ...client.ListOption) bool { err := k8sClient.List(ctx, &podlist, opt...) gomega.Expect(err).ShouldNot(gomega.HaveOccurred(), "failed to list Pods") @@ -207,10 +203,10 @@ func checkServiceHealth(ctx context.Context, rayService *rayv1.RayService) func( // There's no container runtime or any other K8s controllers. // So Pods are created, but no controller updates them from Pending to Running. // See https://book.kubebuilder.io/reference/envtest.html for more details. -func updateHeadPodToRunningAndReady(ctx context.Context, rayClusterName string) { +func updateHeadPodToRunningAndReady(ctx context.Context, rayClusterName string, namespace string) { var instance rayv1.RayCluster gomega.Eventually( - getResourceFunc(ctx, client.ObjectKey{Name: rayClusterName, Namespace: "default"}, &instance), + getResourceFunc(ctx, client.ObjectKey{Name: rayClusterName, Namespace: namespace}, &instance), time.Second*3, time.Millisecond*500).Should(gomega.BeNil(), "RayCluster %v not found", rayClusterName) headPods := corev1.PodList{} @@ -218,7 +214,7 @@ func updateHeadPodToRunningAndReady(ctx context.Context, rayClusterName string) gomega.Eventually( listResourceFunc(ctx, &headPods, headLabels...), - time.Second*15, time.Millisecond*500).Should(gomega.Equal(1), "Head pod list should have only 1 Pod = %v", headPods.Items) + time.Second*3, time.Millisecond*500).Should(gomega.Equal(1), "Head pod list should have only 1 Pod = %v", headPods.Items) headPod := headPods.Items[0] headPod.Status.Phase = corev1.PodRunning @@ -228,9 +224,7 @@ func updateHeadPodToRunningAndReady(ctx context.Context, rayClusterName string) Status: corev1.ConditionTrue, }, } - err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - return k8sClient.Status().Update(ctx, &headPod) - }) + err := k8sClient.Status().Update(ctx, &headPod) gomega.Expect(err).NotTo(gomega.HaveOccurred(), "Failed to update head Pod status to PodRunning") // Make sure the head Pod is updated. @@ -238,3 +232,47 @@ func updateHeadPodToRunningAndReady(ctx context.Context, rayClusterName string) isAllPodsRunningByFilters(ctx, headPods, headLabels...), time.Second*15, time.Millisecond*500).Should(gomega.BeTrue(), "Head Pod should be running: %v", headPods.Items) } + +// Update the status of the worker Pods to Running and Ready. Similar to updateHeadPodToRunningAndReady. +func updateWorkerPodsToRunningAndReady(ctx context.Context, rayClusterName string, namespace string) { + rayCluster := &rayv1.RayCluster{} + gomega.Eventually( + getResourceFunc(ctx, client.ObjectKey{Name: rayClusterName, Namespace: namespace}, rayCluster), + time.Second*3, time.Millisecond*500).Should(gomega.BeNil(), "RayCluster %v not found", rayClusterName) + + workerPods := corev1.PodList{} + workerLabels := common.RayClusterWorkerPodsAssociationOptions(rayCluster).ToListOptions() + numWorkerPods := int(*rayCluster.Spec.WorkerGroupSpecs[0].Replicas) + + gomega.Eventually( + listResourceFunc(ctx, &workerPods, workerLabels...), + time.Second*3, time.Millisecond*500).Should(gomega.Equal(int(numWorkerPods)), "workerGroup: %v", workerPods.Items) + + for _, pod := range workerPods.Items { + pod.Status.Phase = corev1.PodRunning + pod.Status.Conditions = []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + } + err := k8sClient.Status().Update(ctx, &pod) + gomega.Expect(err).NotTo(gomega.HaveOccurred(), "Failed to update worker Pod status to PodRunning") + } + + // Make sure all worker Pods are updated. + gomega.Eventually( + isAllPodsRunningByFilters(ctx, workerPods, workerLabels...), + time.Second*3, time.Millisecond*500).Should(gomega.BeTrue(), "Worker Pods should be running: %v", workerPods.Items) +} + +func updateRayJobSuspendField(ctx context.Context, rayJob *rayv1.RayJob, suspend bool) error { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + err := k8sClient.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: rayJob.Name}, rayJob) + if err != nil { + return err + } + rayJob.Spec.Suspend = suspend + return k8sClient.Update(ctx, rayJob) + }) +}