Skip to content

Commit

Permalink
[Bug][RayJob] Check dashboard readiness before creating job pod (#1381)…
Browse files Browse the repository at this point in the history
… (#1429)

* [Bug][RayJob] Check dashboard readiness before creating job pod (#1381)

* [Bug][RayJob] Enhance the RayJob end-to-end tests to detect bugs similar to those described in (#1381)
  • Loading branch information
rueian authored Sep 26, 2023
1 parent 6b12c18 commit 5da4a04
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 10 deletions.
1 change: 1 addition & 0 deletions ray-operator/apis/ray/v1alpha1/rayjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
JobDeploymentStatusInitializing JobDeploymentStatus = "Initializing"
JobDeploymentStatusFailedToGetOrCreateRayCluster JobDeploymentStatus = "FailedToGetOrCreateRayCluster"
JobDeploymentStatusWaitForDashboard JobDeploymentStatus = "WaitForDashboard"
JobDeploymentStatusWaitForDashboardReady JobDeploymentStatus = "WaitForDashboardReady"
JobDeploymentStatusWaitForK8sJob JobDeploymentStatus = "WaitForK8sJob"
JobDeploymentStatusFailedJobDeploy JobDeploymentStatus = "FailedJobDeploy"
JobDeploymentStatusRunning JobDeploymentStatus = "Running"
Expand Down
17 changes: 12 additions & 5 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
// Always update RayClusterStatus along with jobStatus and jobDeploymentStatus updates.
rayJobInstance.Status.RayClusterStatus = rayClusterInstance.Status

clientURL := rayJobInstance.Status.DashboardURL
if clientURL == "" {
rayDashboardClient := utils.GetRayDashboardClientFunc()
if clientURL := rayJobInstance.Status.DashboardURL; clientURL == "" {
// TODO: dashboard service may be changed. Check it instead of using the same URL always
if clientURL, err = utils.FetchHeadServiceURL(ctx, &r.Log, r.Client, rayClusterInstance, common.DashboardPortName); err != nil || clientURL == "" {
if clientURL == "" {
Expand All @@ -194,12 +194,19 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
err = r.updateState(ctx, rayJobInstance, nil, rayJobInstance.Status.JobStatus, rayv1alpha1.JobDeploymentStatusWaitForDashboard, err)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
// Check the dashboard readiness by checking the err return from rayDashboardClient.GetJobInfo.
// Note that rayDashboardClient.GetJobInfo returns no error in the case of http.StatusNotFound.
// This check is a workaround for https://github.com/ray-project/kuberay/issues/1381.
rayDashboardClient.InitClient(clientURL)
if _, err = rayDashboardClient.GetJobInfo(ctx, rayJobInstance.Status.JobId); err != nil {
err = r.updateState(ctx, rayJobInstance, nil, rayJobInstance.Status.JobStatus, rayv1alpha1.JobDeploymentStatusWaitForDashboardReady, err)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
rayJobInstance.Status.DashboardURL = clientURL
} else {
rayDashboardClient.InitClient(clientURL)
}

rayDashboardClient := utils.GetRayDashboardClientFunc()
rayDashboardClient.InitClient(clientURL)

// Check the current status of ray cluster before submitting.
if rayClusterInstance.Status.State != rayv1alpha1.Ready {
r.Log.Info("waiting for the cluster to be ready", "rayCluster", rayClusterInstance.Name)
Expand Down
47 changes: 47 additions & 0 deletions ray-operator/controllers/ray/rayjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ package ray

import (
"context"
"errors"
"fmt"
"time"

"k8s.io/apimachinery/pkg/api/resource"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -360,6 +362,51 @@ var _ = Context("Inside the default namespace with autoscaler", func() {
})
})

var _ = Context("With a delayed dashboard client", func() {
ctx := context.TODO()
myRayJob := myRayJob.DeepCopy()
myRayJob.Name = "rayjob-delayed-dashbaord"

mockedGetJobInfo := func(_ context.Context, jobId string) (*utils.RayJobInfo, error) {
return nil, errors.New("dashboard is not ready")
}

Describe("When creating a rayjob", func() {
It("should create a rayjob object", func() {
// setup mock first
utils.GetRayDashboardClientFunc().(*utils.FakeRayDashboardClient).GetJobInfoMock.Store(&mockedGetJobInfo)
err := k8sClient.Create(ctx, myRayJob)
Expect(err).NotTo(HaveOccurred(), "failed to create test RayJob resource")
})

It("should see a rayjob object with JobDeploymentStatusWaitForDashboardReady", func() {
Eventually(
getJobDeploymentStatusOfRayJob(ctx, myRayJob),
time.Second*3, time.Millisecond*500).Should(Equal(rayv1alpha1.JobDeploymentStatusWaitForDashboardReady), "My myRayJob = %v", myRayJob.Name)
})

It("Dashboard URL should be set and deployment status should leave the JobDeploymentStatusWaitForDashboardReady", func() {
// clear mock to back to normal behavior
utils.GetRayDashboardClientFunc().(*utils.FakeRayDashboardClient).GetJobInfoMock.Store(nil)
Eventually(
getDashboardURLForRayJob(ctx, myRayJob),
time.Second*15, time.Millisecond*500).Should(HavePrefix(myRayJob.Name), "Dashboard URL = %v", myRayJob.Status.DashboardURL)
Eventually(
getJobDeploymentStatusOfRayJob(ctx, myRayJob),
time.Second*3, time.Millisecond*500).Should(Not(Equal(rayv1alpha1.JobDeploymentStatusWaitForDashboardReady)), "My myRayJob = %v", myRayJob.Name)
})
})
})

func getJobDeploymentStatusOfRayJob(ctx context.Context, rayJob *rayv1alpha1.RayJob) func() (rayv1alpha1.JobDeploymentStatus, error) {
return func() (rayv1alpha1.JobDeploymentStatus, error) {
if err := k8sClient.Get(ctx, client.ObjectKey{Name: rayJob.Name, Namespace: "default"}, rayJob); err != nil {
return "", err
}
return rayJob.Status.JobDeploymentStatus, nil
}
}

func getRayClusterNameForRayJob(ctx context.Context, rayJob *rayv1alpha1.RayJob) func() (string, error) {
return func() (string, error) {
if err := k8sClient.Get(ctx, client.ObjectKey{Name: rayJob.Name, Namespace: "default"}, rayJob); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions ray-operator/controllers/ray/rayservice_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ applications:
fakeRayDashboardClient := prepareFakeRayDashboardClient()

utils.GetRayDashboardClientFunc = func() utils.RayDashboardClientInterface {
return &fakeRayDashboardClient
return fakeRayDashboardClient
}

utils.GetRayHttpProxyClientFunc = utils.GetFakeRayHttpProxyClient
Expand Down Expand Up @@ -648,8 +648,8 @@ applications:
})
})

func prepareFakeRayDashboardClient() utils.FakeRayDashboardClient {
client := utils.FakeRayDashboardClient{}
func prepareFakeRayDashboardClient() *utils.FakeRayDashboardClient {
client := &utils.FakeRayDashboardClient{}

client.SetSingleApplicationStatus(generateServeStatus(rayv1alpha1.DeploymentStatusEnum.HEALTHY, rayv1alpha1.ApplicationStatusEnum.RUNNING))

Expand Down
8 changes: 7 additions & 1 deletion ray-operator/controllers/ray/utils/fake_serve_httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/http"
"sync/atomic"

"github.com/go-logr/logr"

Expand All @@ -15,6 +16,8 @@ type FakeRayDashboardClient struct {
singleAppStatus ServeApplicationStatus
multiAppStatuses map[string]*ServeApplicationStatus
serveDetails ServeDetails

GetJobInfoMock atomic.Pointer[func(context.Context, string) (*RayJobInfo, error)]
}

var _ RayDashboardClientInterface = (*FakeRayDashboardClient)(nil)
Expand Down Expand Up @@ -53,7 +56,10 @@ func (r *FakeRayDashboardClient) SetMultiApplicationStatuses(statuses map[string
r.multiAppStatuses = statuses
}

func (r *FakeRayDashboardClient) GetJobInfo(_ context.Context, jobId string) (*RayJobInfo, error) {
func (r *FakeRayDashboardClient) GetJobInfo(ctx context.Context, jobId string) (*RayJobInfo, error) {
if mock := r.GetJobInfoMock.Load(); mock != nil {
return (*mock)(ctx, jobId)
}
return nil, nil
}

Expand Down
12 changes: 11 additions & 1 deletion tests/framework/prototype.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,11 +491,15 @@ def wait(self):
expected_head_pods = get_expected_head_pods(self.custom_resource_object)
expected_worker_pods = get_expected_worker_pods(self.custom_resource_object)
expected_rayclusters = 1
expected_job_pods = 1
# Wait until:
# (1) The number of head pods and worker pods are as expected.
# (2) All head pods and worker pods are "Running".
# (3) A RayCluster has been created.
# (4) RayJob named "rayjob-sample" has status "SUCCEEDED".
# (4) Exactly one Job pod has been created.
# (5) RayJob named "rayjob-sample" has status "SUCCEEDED".
# We check the `expected_job_pods = 1` condition to catch situations described in
# https://github.com/ray-project/kuberay/issues/1381
converge = False
k8s_v1_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY]
custom_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_CR_CLIENT_KEY]
Expand All @@ -509,9 +513,12 @@ def wait(self):
namespace = self.namespace, label_selector='ray.io/node-type=worker')
rayjob = get_custom_object(CONST.RAY_JOB_CRD, self.namespace,
self.custom_resource_object['metadata']['name'])
jobpods = k8s_v1_api.list_namespaced_pod(
namespace = self.namespace, label_selector='job-name='+self.custom_resource_object['metadata']['name'])

if (len(headpods.items) == expected_head_pods
and len(workerpods.items) == expected_worker_pods
and len(jobpods.items) == expected_job_pods
and check_pod_running(headpods.items) and check_pod_running(workerpods.items)
and rayjob.get('status') is not None
and rayjob.get('status').get('jobStatus') == "SUCCEEDED"
Expand All @@ -532,6 +539,9 @@ def wait(self):
if len(workerpods.items) != expected_worker_pods:
logger.info("expected_worker_pods: %d, actual_worker_pods: %d",
expected_worker_pods, len(workerpods.items))
if len(jobpods.items) != expected_job_pods:
logger.info("expected_job_pods: %d, actual_job_pods: %d",
expected_job_pods, len(jobpods.items))
if not check_pod_running(headpods.items):
logger.info("head pods are not running yet.")
if not check_pod_running(workerpods.items):
Expand Down

0 comments on commit 5da4a04

Please sign in to comment.