Skip to content


[RayJob] Add Tests for Atomic Suspend Operation (#2050)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yicheng-Lu-llll authored Apr 12, 2024
1 parent 47b4e80 commit 1f44bdc
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 219 deletions.
289 changes: 112 additions & 177 deletions ray-operator/controllers/ray/rayjob_controller_suspended_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,242 +17,177 @@ package ray

import (



. ""
. ""

rayv1 ""

corev1 ""

batchv1 ""

metav1 ""

var _ = Context("Inside the default namespace", func() {
ctx := context.TODO()
var workerPods corev1.PodList
var headPods corev1.PodList

mySuspendedRayCluster := &rayv1.RayCluster{}

mySuspendedRayJob := &rayv1.RayJob{
ObjectMeta: metav1.ObjectMeta{
Name: "rayjob-test-suspend",
Namespace: "default",
Spec: rayv1.RayJobSpec{
ShutdownAfterJobFinishes: true,
Suspend: true,
Entrypoint: "sleep 999",
RayClusterSpec: &rayv1.RayClusterSpec{
RayVersion: "2.9.0",
HeadGroupSpec: rayv1.HeadGroupSpec{
RayStartParams: map[string]string{},
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
Name: "ray-head",
Image: "rayproject/ray:2.8.0",
Resources: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("2Gi"),
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("2Gi"),
Ports: []corev1.ContainerPort{
Name: "gcs-server",
ContainerPort: 6379,
Name: "dashboard",
ContainerPort: 8265,
Name: "head",
ContainerPort: 10001,
Name: "dashboard-agent",
ContainerPort: 52365,
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
Replicas: pointer.Int32(3),
MinReplicas: pointer.Int32(0),
MaxReplicas: pointer.Int32(10000),
GroupName: "small-group",
RayStartParams: map[string]string{},
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
Name: "ray-worker",
Image: "rayproject/ray:2.8.0",

var _ = Context("RayJob with suspend operation", func() {
Describe("When creating a rayjob with suspend == true", func() {
ctx := context.Background()
namespace := "default"
rayCluster := &rayv1.RayCluster{}
rayJob := rayJobTemplate("rayjob-suspend", namespace)
rayJob.Spec.Suspend = true

It("should create a rayjob object", func() {
err := k8sClient.Create(ctx, mySuspendedRayJob)
err := k8sClient.Create(ctx, rayJob)
Expect(err).NotTo(HaveOccurred(), "failed to create test RayJob resource")

It("should see a rayjob object", func() {
getResourceFunc(ctx, client.ObjectKey{Name: mySuspendedRayJob.Name, Namespace: "default"}, mySuspendedRayJob),
time.Second*3, time.Millisecond*500).Should(BeNil(), "My myRayJob = %v", mySuspendedRayJob.Name)

It("should have deployment status suspended", func() {
getRayJobDeploymentStatus(ctx, mySuspendedRayJob),
getRayJobDeploymentStatus(ctx, rayJob),
time.Second*5, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusSuspended))

It("should NOT create a raycluster object", func() {
getRayClusterNameForRayJob(ctx, mySuspendedRayJob),
getRayClusterNameForRayJob(ctx, rayJob),
time.Second*3, time.Millisecond*500).Should(BeEmpty())

It("should unsuspend a rayjob object", func() {
mySuspendedRayJob.Spec.Suspend = false
err := k8sClient.Update(ctx, mySuspendedRayJob)
Expect(err).NotTo(HaveOccurred(), "failed to update test RayJob resource")
err := updateRayJobSuspendField(ctx, rayJob, false)
Expect(err).NotTo(HaveOccurred(), "failed to update RayJob")

It("should create a raycluster object", func() {
// Ray Cluster name can be present on RayJob's CRD
getRayClusterNameForRayJob(ctx, mySuspendedRayJob),
getRayClusterNameForRayJob(ctx, rayJob),
time.Second*15, time.Millisecond*500).Should(Not(BeEmpty()))
// The actual cluster instance and underlying resources SHOULD be created when suspend == false
// k8sClient client does not throw error if cluster IS found
getResourceFunc(ctx, client.ObjectKey{Name: mySuspendedRayJob.Status.RayClusterName, Namespace: "default"}, mySuspendedRayCluster),
getResourceFunc(ctx, common.RayJobRayClusterNamespacedName(rayJob), rayCluster),
time.Second*3, time.Millisecond*500).Should(BeNil())

It("should create 3 workers", func() {
listResourceFunc(ctx, &workerPods, client.MatchingLabels{
utils.RayClusterLabelKey: mySuspendedRayCluster.Name,
utils.RayNodeGroupLabelKey: "small-group",
It("should NOT create the underlying K8s job yet because the cluster is not ready", func() {
underlyingK8sJob := &batchv1.Job{}
// k8sClient client throws error if resource not found
func() bool {
err := getResourceFunc(ctx, common.RayJobK8sJobNamespacedName(rayJob), underlyingK8sJob)()
return errors.IsNotFound(err)
&client.ListOptions{Namespace: "default"}),
time.Second*15, time.Millisecond*500).Should(Equal(3), fmt.Sprintf("workerGroup %v", workerPods.Items))
if len(workerPods.Items) > 0 {
Expect(workerPods.Items[0].Status.Phase).Should(Or(Equal(corev1.PodRunning), Equal(corev1.PodPending)))
time.Second*3, time.Millisecond*500).Should(BeTrue())

It("should create a head pod resource", func() {
err := k8sClient.List(ctx, &headPods,
utils.RayClusterLabelKey: mySuspendedRayCluster.Name,
utils.RayNodeGroupLabelKey: utils.RayNodeHeadGroupLabelValue,
&client.ListOptions{Namespace: "default"},

Expect(err).NotTo(HaveOccurred(), "failed list head pods")
Expect(len(headPods.Items)).Should(BeNumerically("==", 1), "My head pod list= %v", headPods.Items)
It("should be able to update all Pods to Running", func() {
updateHeadPodToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace)
updateWorkerPodsToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace)

pod := &corev1.Pod{}
if len(headPods.Items) > 0 {
pod = &headPods.Items[0]
It("Dashboard URL should be set", func() {
getResourceFunc(ctx, client.ObjectKey{Name: pod.Name, Namespace: "default"}, pod),
time.Second*3, time.Millisecond*500).Should(BeNil(), "My head pod = %v", pod)
getDashboardURLForRayJob(ctx, rayJob),
time.Second*3, time.Millisecond*500).Should(HavePrefix(rayJob.Name), "Dashboard URL = %v", rayJob.Status.DashboardURL)

It("should NOT create the underlying K8s job yet because the cluster is not ready", func() {
It("should create the underlying Kubernetes Job object", func() {
underlyingK8sJob := &batchv1.Job{}
// The underlying Kubernetes Job should be created when the RayJob is created
// k8sClient client throws error if resource not found
func() bool {
err := getResourceFunc(ctx, client.ObjectKey{Name: mySuspendedRayJob.Name, Namespace: "default"}, underlyingK8sJob)()
return errors.IsNotFound(err)
time.Second*10, time.Millisecond*500).Should(BeTrue())
getResourceFunc(ctx, common.RayJobK8sJobNamespacedName(rayJob), underlyingK8sJob),
time.Second*3, time.Millisecond*500).Should(BeNil(), "Expected Kubernetes job to be present")

It("should be able to update all Pods to Running", func() {
// We need to manually update Pod statuses otherwise they'll always be Pending.
// envtest doesn't create a full K8s cluster. It's only the control plane.
// There's no container runtime or any other K8s controllers.
// So Pods are created, but no controller updates them from Pending to Running.
// See
Describe("RayJob suspend operation shoud be atomic", func() {
ctx := context.Background()
namespace := "default"
rayJob := rayJobTemplate("rayjob-atomic-suspend", namespace)
rayCluster := &rayv1.RayCluster{}

for _, headPod := range headPods.Items {
headPod.Status.Phase = corev1.PodRunning
Expect(k8sClient.Status().Update(ctx, &headPod)).Should(BeNil())
It("Create a RayJob custom resource", func() {
err := k8sClient.Create(ctx, rayJob)
Expect(err).NotTo(HaveOccurred(), "Failed to create RayJob")

It("RayJobs's JobDeploymentStatus transitions from New to Initializing.", func() {
isAllPodsRunning(ctx, headPods, client.MatchingLabels{
utils.RayClusterLabelKey: mySuspendedRayCluster.Name,
utils.RayNodeGroupLabelKey: utils.RayNodeHeadGroupLabelValue,
}, "default"),
time.Second*15, time.Millisecond*500).Should(Equal(true), "Head Pod should be running.")
getRayJobDeploymentStatus(ctx, rayJob),
time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusInitializing), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus)

for _, workerPod := range workerPods.Items {
workerPod.Status.Phase = corev1.PodRunning
Expect(k8sClient.Status().Update(ctx, &workerPod)).Should(BeNil())
It("Make RayCluster.Status.State to be rayv1.Ready", func() {
updateHeadPodToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace)
updateWorkerPodsToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace)
getClusterState(ctx, namespace, rayJob.Status.RayClusterName),
time.Second*3, time.Millisecond*500).Should(Equal(rayv1.Ready))

It("RayJobs's JobDeploymentStatus transitions from Initializing to Running.", func() {
isAllPodsRunning(ctx, workerPods, client.MatchingLabels{utils.RayClusterLabelKey: mySuspendedRayCluster.Name, utils.RayNodeGroupLabelKey: "small-group"}, "default"),
time.Second*15, time.Millisecond*500).Should(Equal(true), "All worker Pods should be running.")
getRayJobDeploymentStatus(ctx, rayJob),
time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusRunning), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus)

It("Dashboard URL should be set", func() {
// The finalizer here is used to prevent the RayCluster from being deleted,
// ensuring that the RayJob remains in Suspending status once the suspend field is set to true.
It("Add finalizer to the RayCluster", func() {
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
err := k8sClient.Get(ctx, common.RayJobRayClusterNamespacedName(rayJob), rayCluster)
if err != nil {
return err
rayCluster.Finalizers = append(rayCluster.Finalizers, "")
return k8sClient.Update(ctx, rayCluster)
Expect(err).NotTo(HaveOccurred(), "failed to add finalizer to RayCluster")

It("Suspend the RayJob", func() {
err := updateRayJobSuspendField(ctx, rayJob, true)
Expect(err).NotTo(HaveOccurred(), "failed to update RayJob")

It("RayJobs's JobDeploymentStatus transitions from Running to Suspending.", func() {
getDashboardURLForRayJob(ctx, mySuspendedRayJob),
time.Second*3, time.Millisecond*500).Should(HavePrefix(mySuspendedRayJob.Name), "Dashboard URL = %v", mySuspendedRayJob.Status.DashboardURL)
getRayJobDeploymentStatus(ctx, rayJob),
time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusSuspending), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus)

It("should create the underlying Kubernetes Job object", func() {
underlyingK8sJob := &batchv1.Job{}
// The underlying Kubernetes Job should be created when the RayJob is created
// The suspend operation is atomic; regardless of how the user sets the suspend field at this moment, the status should be Suspending.
It("Change the suspend field of RayJob from true to false and then back to true.", func() {
err := updateRayJobSuspendField(ctx, rayJob, false)
Expect(err).NotTo(HaveOccurred(), "failed to update RayJob")
getRayJobDeploymentStatus(ctx, rayJob),
time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusSuspending), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus)

err = updateRayJobSuspendField(ctx, rayJob, true)
Expect(err).NotTo(HaveOccurred(), "failed to update RayJob")
getRayJobDeploymentStatus(ctx, rayJob),
time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusSuspending), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus)

It("Remove finalizer from the RayCluster", func() {
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
err := k8sClient.Get(ctx, common.RayJobRayClusterNamespacedName(rayJob), rayCluster)
if err != nil {
return err
rayCluster.Finalizers = []string{}
return k8sClient.Update(ctx, rayCluster)
Expect(err).NotTo(HaveOccurred(), "failed to remove finalizer from RayCluster")

It("RayJobs's JobDeploymentStatus transitions from Suspending to Suspended.", func() {
// k8sClient does not throw error if Job is found
func() error {
return getResourceFunc(ctx, client.ObjectKey{Name: mySuspendedRayJob.Name, Namespace: "default"}, underlyingK8sJob)()
time.Second*15, time.Millisecond*500).Should(BeNil(), "Expected Kubernetes job to be present")
getRayJobDeploymentStatus(ctx, rayJob),
time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusSuspended), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus)
29 changes: 2 additions & 27 deletions ray-operator/controllers/ray/rayjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package ray

import (

Expand Down Expand Up @@ -167,33 +166,9 @@ var _ = Context("RayJob in K8sJobMode", func() {
It("Make RayCluster.Status.State to be rayv1.Ready", func() {
// The RayCluster is not 'Ready' yet because Pods are not running and ready.
allPods := []corev1.Pod{}

// Check whether the number of worker Pods is consistent with RayCluster CR or not.
numWorkerPods := int(*rayCluster.Spec.WorkerGroupSpecs[0].Replicas)
workerFilterLabels := client.MatchingLabels{utils.RayClusterLabelKey: rayCluster.Name, utils.RayNodeGroupLabelKey: rayCluster.Spec.WorkerGroupSpecs[0].GroupName}
workerPods := corev1.PodList{}
listResourceFunc(ctx, &workerPods, workerFilterLabels, &client.ListOptions{Namespace: namespace}),
time.Second*3, time.Millisecond*500).Should(Equal(int(numWorkerPods)), fmt.Sprintf("workerGroup: %v", workerPods.Items))

// The number of head Pods should be 1.
headPods := corev1.PodList{}
headFilterLabels := client.MatchingLabels{utils.RayClusterLabelKey: rayCluster.Name, utils.RayNodeGroupLabelKey: utils.RayNodeHeadGroupLabelValue}
listResourceFunc(ctx, &headPods, headFilterLabels, &client.ListOptions{Namespace: namespace}),
time.Second*3, time.Millisecond*500).Should(Equal(1), fmt.Sprintf("head Pod: %v", headPods.Items))

// Update all Pods, including head and worker Pods, to Running and PodReady.
allPods = append(allPods, headPods.Items...)
allPods = append(allPods, workerPods.Items...)

for _, pod := range allPods {
pod.Status.Phase = corev1.PodRunning
// In envtest, if Pod.Status.Phase is set to running, the Pod's PodReady condition becomes true automatically.
// Check for more details.
Expect(k8sClient.Status().Update(ctx, &pod)).Should(BeNil())
updateHeadPodToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace)
updateWorkerPodsToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace)

// The RayCluster.Status.State should be Ready.
Expand Down

0 comments on commit 1f44bdc

Please sign in to comment.