diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml index 3a548918e12..4f05b6db10f 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml @@ -7227,6 +7227,9 @@ spec: type: object status: properties: + NumServeEndpoints: + format: int32 + type: integer activeServiceStatus: properties: applicationStatuses: diff --git a/helm-chart/kuberay-operator/templates/multiple_namespaces_role.yaml b/helm-chart/kuberay-operator/templates/multiple_namespaces_role.yaml index 488889d508a..4925893bce6 100644 --- a/helm-chart/kuberay-operator/templates/multiple_namespaces_role.yaml +++ b/helm-chart/kuberay-operator/templates/multiple_namespaces_role.yaml @@ -32,6 +32,13 @@ rules: - get - list - update +- apiGroups: + - "" + resources: + - endpoints + verbs: + - get + - list - apiGroups: - "" resources: diff --git a/helm-chart/kuberay-operator/templates/role.yaml b/helm-chart/kuberay-operator/templates/role.yaml index 8c5e0a27d75..cc1e3bf3752 100644 --- a/helm-chart/kuberay-operator/templates/role.yaml +++ b/helm-chart/kuberay-operator/templates/role.yaml @@ -28,6 +28,13 @@ rules: - get - list - update +- apiGroups: + - "" + resources: + - endpoints + verbs: + - get + - list - apiGroups: - "" resources: diff --git a/ray-operator/apis/ray/v1/rayservice_types.go b/ray-operator/apis/ray/v1/rayservice_types.go index 5ace7a92c5c..4d94847a4c3 100644 --- a/ray-operator/apis/ray/v1/rayservice_types.go +++ b/ray-operator/apis/ray/v1/rayservice_types.go @@ -71,6 +71,9 @@ type RayServiceStatuses struct { PendingServiceStatus RayServiceStatus `json:"pendingServiceStatus,omitempty"` // ServiceStatus indicates the current RayService status. ServiceStatus ServiceStatus `json:"serviceStatus,omitempty"` + // NumServeEndpoints indicates the number of Ray Pods that are actively serving or have been selected by the serve service. + // Ray Pods without a proxy actor or those that are unhealthy will not be counted. + NumServeEndpoints int32 `json:"NumServeEndpoints,omitempty"` // observedGeneration is the most recent generation observed for this RayService. It corresponds to the // RayService's generation, which is updated on mutation by the API Server. // +optional diff --git a/ray-operator/config/crd/bases/ray.io_rayservices.yaml b/ray-operator/config/crd/bases/ray.io_rayservices.yaml index 3a548918e12..4f05b6db10f 100644 --- a/ray-operator/config/crd/bases/ray.io_rayservices.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayservices.yaml @@ -7227,6 +7227,9 @@ spec: type: object status: properties: + NumServeEndpoints: + format: int32 + type: integer activeServiceStatus: properties: applicationStatuses: diff --git a/ray-operator/config/rbac/role.yaml b/ray-operator/config/rbac/role.yaml index 5877cf9edb6..c15fdd9bfcc 100644 --- a/ray-operator/config/rbac/role.yaml +++ b/ray-operator/config/rbac/role.yaml @@ -25,6 +25,13 @@ rules: - get - list - update +- apiGroups: + - "" + resources: + - endpoints + verbs: + - get + - list - apiGroups: - "" resources: diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index 4fd509adffd..abb5e243a91 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -82,6 +82,7 @@ func NewRayServiceReconciler(mgr manager.Manager, dashboardClientFunc func() uti // +kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=core,resources=pods/status,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=core,resources=endpoints,verbs=get;list // +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=core,resources=services/status,verbs=get;update;patch // +kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=get;list;create;update @@ -212,6 +213,9 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque } } + serveEndPoints, getServeEndPointsErr := r.getServeEndPoints(ctx, rayServiceInstance, rayClusterInstance) + calculateStatusForNumServeEndpoints(serveEndPoints, getServeEndPointsErr, rayServiceInstance) + // Final status update for any CR modification. if r.inconsistentRayServiceStatuses(originalRayServiceInstance.Status, rayServiceInstance.Status) { rayServiceInstance.Status.LastUpdateTime = &metav1.Time{Time: time.Now()} @@ -221,9 +225,38 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque } } + if getServeEndPointsErr != nil { + return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, getServeEndPointsErr + } return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, nil } +func (r *RayServiceReconciler) getServeEndPoints(ctx context.Context, rayServiceInstance *rayv1.RayService, rayClusterInstance *rayv1.RayCluster) (*corev1.Endpoints, error) { + serveSvc, err := common.BuildServeServiceForRayService(ctx, *rayServiceInstance, *rayClusterInstance) + if err != nil { + return nil, err + } + endpoints := &corev1.Endpoints{} + if err = r.Get(ctx, client.ObjectKey{Name: serveSvc.Name, Namespace: serveSvc.Namespace}, endpoints); err == nil { + return endpoints, nil + } else if errors.IsNotFound(err) { + return nil, nil + } else { + r.Log.Error(err, "Fail to retrieve the Kubernetes Endpoints from the cluster!") + return nil, err + } +} + +func calculateStatusForNumServeEndpoints(serveEndPoints *corev1.Endpoints, getServeEndPointsErr error, rayServiceInstance *rayv1.RayService) { + numServeEndpoints := 0 + if getServeEndPointsErr == nil && serveEndPoints != nil { + for _, subset := range serveEndPoints.Subsets { + numServeEndpoints += len(subset.Addresses) + } + } + rayServiceInstance.Status.NumServeEndpoints = int32(numServeEndpoints) +} + // Checks whether the old and new RayServiceStatus are inconsistent by comparing different fields. // If the only difference between the old and new status is the HealthLastUpdateTime field, // the status update will not be triggered. @@ -285,6 +318,11 @@ func (r *RayServiceReconciler) inconsistentRayServiceStatuses(oldStatus rayv1.Ra return true } + if oldStatus.NumServeEndpoints != newStatus.NumServeEndpoints { + r.Log.Info(fmt.Sprintf("inconsistentRayServiceStatus RayService NumServeEndpoints changed from %d to %d", oldStatus.NumServeEndpoints, newStatus.NumServeEndpoints)) + return true + } + if r.inconsistentRayServiceStatus(oldStatus.ActiveServiceStatus, newStatus.ActiveServiceStatus) { r.Log.Info("inconsistentRayServiceStatus RayService ActiveServiceStatus changed") return true diff --git a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayservicestatuses.go b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayservicestatuses.go index cab88874e48..7b36f53fb92 100644 --- a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayservicestatuses.go +++ b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayservicestatuses.go @@ -13,6 +13,7 @@ type RayServiceStatusesApplyConfiguration struct { ActiveServiceStatus *RayServiceStatusApplyConfiguration `json:"activeServiceStatus,omitempty"` PendingServiceStatus *RayServiceStatusApplyConfiguration `json:"pendingServiceStatus,omitempty"` ServiceStatus *rayv1.ServiceStatus `json:"serviceStatus,omitempty"` + NumServeEndpoints *int32 `json:"NumServeEndpoints,omitempty"` ObservedGeneration *int64 `json:"observedGeneration,omitempty"` LastUpdateTime *metav1.Time `json:"lastUpdateTime,omitempty"` } @@ -47,6 +48,14 @@ func (b *RayServiceStatusesApplyConfiguration) WithServiceStatus(value rayv1.Ser return b } +// WithNumServeEndpoints sets the NumServeEndpoints field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the NumServeEndpoints field is set to the value of the last call. +func (b *RayServiceStatusesApplyConfiguration) WithNumServeEndpoints(value int32) *RayServiceStatusesApplyConfiguration { + b.NumServeEndpoints = &value + return b +} + // WithObservedGeneration sets the ObservedGeneration field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. // If called multiple times, the ObservedGeneration field is set to the value of the last call.