Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support Volcano for batch scheduling #755

Merged
merged 30 commits into from
Dec 1, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
4f6dc0c
Resolved conflicts
Oct 28, 2022
80e0a93
WIP: enable batch scheduler
tgaddair Nov 22, 2022
3bcadf4
Revert dockerfile
tgaddair Nov 22, 2022
510fa36
WIP
tgaddair Nov 24, 2022
976e0b0
Added volcano roles
tgaddair Nov 29, 2022
30d2b1f
WIP
tgaddair Nov 29, 2022
0c0808c
Added batch scheduler
tgaddair Nov 29, 2022
390a0db
Added batch scheduler interface
tgaddair Nov 29, 2022
0de097c
Make copyright consistent
tgaddair Nov 29, 2022
05fdd8e
Added test
tgaddair Nov 30, 2022
7b9fd97
Refactored init
tgaddair Nov 30, 2022
d466abc
Conditions in helm
tgaddair Nov 30, 2022
22d71d9
Added dep
tgaddair Nov 30, 2022
272fef2
Revert "Added dep"
tgaddair Nov 30, 2022
67b1752
Added docs
tgaddair Nov 30, 2022
538892d
Moved into controllers
tgaddair Nov 30, 2022
a77d995
fmt
tgaddair Nov 30, 2022
5e33b0b
Fixed helm
tgaddair Nov 30, 2022
a5e7efb
Fixed helm
tgaddair Nov 30, 2022
6e5077f
Update docs/guidance/volcano-integration.md
tgaddair Nov 30, 2022
e613b16
Update docs/guidance/volcano-integration.md
tgaddair Nov 30, 2022
63bd52a
Update docs/guidance/volcano-integration.md
tgaddair Nov 30, 2022
0878ffa
Apply suggestions from code review
tgaddair Nov 30, 2022
ecfcc5e
Fixed Equals
tgaddair Dec 1, 2022
37e50bd
Merge branch 'volcano-int' of https://github.com/ray-project/kuberay …
tgaddair Dec 1, 2022
8621ea3
WIP docs
tgaddair Dec 1, 2022
d894cb0
Added apiGroups
tgaddair Dec 1, 2022
af81355
Completed README
tgaddair Dec 1, 2022
224aa32
Typo
tgaddair Dec 1, 2022
09a7ff8
Updated example
tgaddair Dec 1, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
vcbetav1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
)

var (
DefaultRequeueDuration = 2 * time.Second
PrioritizeWorkersToDelete bool
ForcedClusterUpgrade bool
EnableBatchScheduler bool
)

// NewReconciler returns a new reconcile.Reconciler
Expand Down Expand Up @@ -336,6 +338,20 @@ func (r *RayClusterReconciler) reconcilePods(instance *rayiov1alpha1.RayCluster)
if err := r.List(context.TODO(), &headPods, client.InNamespace(instance.Namespace), filterLabels); err != nil {
return err
}
if EnableBatchScheduler {
var minMember int32
DmitriGekhtman marked this conversation as resolved.
Show resolved Hide resolved
var totalResource corev1.ResourceList
if instance.Spec.EnableInTreeAutoscaling == nil || !*instance.Spec.EnableInTreeAutoscaling {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use minMember to indicate min of gang or use a separate annotation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Volcano, it considers minMember to be the min gang size, which we set to be the total replica count (if not using autoscaling) or the total minReplica count (when using autoscaling).

Did you want to make this more configurable so that users can specify their own min gang size?

minMember = utils.CalculateDesiredReplicas(instance) + *instance.Spec.HeadGroupSpec.Replicas
totalResource = utils.CalculateDesiredResources(instance)
} else {
minMember = utils.CalculateMinReplicas(instance) + *instance.Spec.HeadGroupSpec.Replicas
totalResource = utils.CalculateMinResources(instance)
}
if err := r.syncPodGroup(instance, minMember, totalResource); err != nil {
return err
}
}
// Reconcile head Pod
if len(headPods.Items) == 1 {
headPod := headPods.Items[0]
Expand Down Expand Up @@ -654,6 +670,9 @@ func (r *RayClusterReconciler) createHeadPod(instance rayiov1alpha1.RayCluster)
Name: pod.Name,
Namespace: pod.Namespace,
}
if EnableBatchScheduler {
pod.Annotations[vcbetav1.KubeGroupNameAnnotationKey] = instance.Name
}

r.Log.Info("createHeadPod", "head pod with name", pod.GenerateName)
if err := r.Create(context.TODO(), &pod); err != nil {
Expand Down Expand Up @@ -682,6 +701,9 @@ func (r *RayClusterReconciler) createWorkerPod(instance rayiov1alpha1.RayCluster
Name: pod.Name,
Namespace: pod.Namespace,
}
if EnableBatchScheduler {
pod.Annotations[vcbetav1.KubeGroupNameAnnotationKey] = instance.Name
}
replica := pod
if err := r.Create(context.TODO(), &replica); err != nil {
if errors.IsAlreadyExists(err) {
Expand Down Expand Up @@ -771,6 +793,10 @@ func (r *RayClusterReconciler) SetupWithManager(mgr ctrl.Manager, reconcileConcu
IsController: true,
OwnerType: &rayiov1alpha1.RayCluster{},
}).
Watches(&source.Kind{Type: &vcbetav1.PodGroup{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &rayiov1alpha1.RayCluster{},
}).
WithOptions(controller.Options{MaxConcurrentReconciles: reconcileConcurrency}).
Complete(r)
}
Expand Down Expand Up @@ -1049,3 +1075,53 @@ func (r *RayClusterReconciler) updateClusterState(instance *rayiov1alpha1.RayClu
instance.Status.State = clusterState
return r.Status().Update(context.Background(), instance)
}

func (r *RayClusterReconciler) syncPodGroup(instance *rayiov1alpha1.RayCluster, size int32, totalResource corev1.ResourceList) error {
// use cluster name as pod group name
podGroupName := instance.Name
podGroupIdentifier := types.NamespacedName{
Name: podGroupName,
Namespace: instance.Namespace,
}
podGroup := &vcbetav1.PodGroup{}
if err := r.Get(context.TODO(), podGroupIdentifier, podGroup); err != nil {
if !errors.IsNotFound(err) {
return err
}

// use the same name as ray cluster
podGroup.Name = instance.Name
podGroup.Namespace = instance.Namespace
podGroup.OwnerReferences = []metav1.OwnerReference{
*metav1.NewControllerRef(instance, rayiov1alpha1.SchemeGroupVersion.WithKind("RayCluster")),
}
podGroup.Spec.MinMember = size
podGroup.Spec.MinResources = &totalResource
if queue, ok := instance.Spec.HeadGroupSpec.Template.Labels["volcano.sh/queue-name"]; ok {
podGroup.Spec.Queue = queue
}
podGroup.Spec.PriorityClassName = instance.Spec.HeadGroupSpec.Template.Spec.PriorityClassName
podGroup.Status = vcbetav1.PodGroupStatus{
Phase: vcbetav1.PodGroupPending,
}

if err := r.Create(context.TODO(), podGroup); err != nil {
if errors.IsAlreadyExists(err) {
r.Log.Info("pod group already exist, no need to create")
return nil
}
r.Log.Error(err, "Pod group create error!", "PodGroup.Error", err)
return err
}
} else {
if podGroup.Spec.MinMember != size || !utils.Equals(*podGroup.Spec.MinResources, totalResource) {
podGroup.Spec.MinMember = size
podGroup.Spec.MinResources = &totalResource
if err := r.Update(context.TODO(), podGroup); err != nil {
r.Log.Error(err, "Fail to update pod group!", "podGroup", podGroupIdentifier)
return err
}
}
}
return nil
}
86 changes: 86 additions & 0 deletions ray-operator/controllers/ray/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,73 @@ func CalculateAvailableReplicas(pods corev1.PodList) int32 {
return count
}

func CalculateDesiredResources(cluster *rayiov1alpha1.RayCluster) corev1.ResourceList {
desiredResourcesList := []corev1.ResourceList{{}}
headPodResource := calculatePodResource(cluster.Spec.HeadGroupSpec.Template.Spec)
for i := int32(0); i < *cluster.Spec.HeadGroupSpec.Replicas; i++ {
desiredResourcesList = append(desiredResourcesList, headPodResource)
}
for _, nodeGroup := range cluster.Spec.WorkerGroupSpecs {
podResource := calculatePodResource(nodeGroup.Template.Spec)
for i := int32(0); i < *nodeGroup.Replicas; i++ {
desiredResourcesList = append(desiredResourcesList, podResource)
}
}
return sumResourceList(desiredResourcesList)
}

func CalculateMinResources(cluster *rayiov1alpha1.RayCluster) corev1.ResourceList {
minResourcesList := []corev1.ResourceList{{}}
headPodResource := calculatePodResource(cluster.Spec.HeadGroupSpec.Template.Spec)
for i := int32(0); i < *cluster.Spec.HeadGroupSpec.Replicas; i++ {
minResourcesList = append(minResourcesList, headPodResource)
}
for _, nodeGroup := range cluster.Spec.WorkerGroupSpecs {
podResource := calculatePodResource(nodeGroup.Template.Spec)
for i := int32(0); i < *nodeGroup.MinReplicas; i++ {
minResourcesList = append(minResourcesList, podResource)
}
}
return sumResourceList(minResourcesList)
}

func calculatePodResource(podSpec corev1.PodSpec) corev1.ResourceList {
podResource := corev1.ResourceList{}
for _, container := range podSpec.Containers {
containerResource := container.Resources.Requests
for name, quantity := range container.Resources.Limits {
if _, ok := containerResource[name]; !ok {
containerResource[name] = quantity
}
}
for name, quantity := range containerResource {
if totalQuantity, ok := podResource[name]; ok {
totalQuantity.Add(quantity)
podResource[name] = totalQuantity
} else {
podResource[name] = quantity
}
}
}
return podResource
}

func sumResourceList(list []corev1.ResourceList) corev1.ResourceList {
totalResource := corev1.ResourceList{}
for _, l := range list {
for name, quantity := range l {

if value, ok := totalResource[name]; !ok {
totalResource[name] = quantity.DeepCopy()
} else {
value.Add(quantity)
totalResource[name] = value
}
}
}
return totalResource
}

func Contains(elems []string, searchTerm string) bool {
for _, s := range elems {
if searchTerm == s {
Expand Down Expand Up @@ -359,3 +426,22 @@ func GenerateJsonHash(obj interface{}) (string, error) {

return hashStr, nil
}

// Equals returns true if the two lists are equivalent
func Equals(a corev1.ResourceList, b corev1.ResourceList) bool {
tgaddair marked this conversation as resolved.
Show resolved Hide resolved
if len(a) != len(b) {
return false
}

for key, value1 := range a {
value2, found := b[key]
if !found {
return false
}
if value1.Cmp(value2) != 0 {
return false
}
}

return true
}
1 change: 1 addition & 0 deletions ray-operator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
k8s.io/code-generator v0.23.0
k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b
sigs.k8s.io/controller-runtime v0.11.1
volcano.sh/apis v1.6.0-alpha.0.0.20221012070524-685db38b4fae
)

require (
Expand Down
4 changes: 4 additions & 0 deletions ray-operator/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,8 @@ k8s.io/component-base v0.23.0 h1:UAnyzjvVZ2ZR1lF35YwtNY6VMN94WtOnArcXBu34es8=
k8s.io/component-base v0.23.0/go.mod h1:DHH5uiFvLC1edCpvcTDV++NKULdYYU6pR9Tt3HIKMKI=
k8s.io/gengo v0.0.0-20210813121822-485abfe95c7c h1:GohjlNKauSai7gN4wsJkeZ3WAJx4Sh+oT/b5IYn5suA=
k8s.io/gengo v0.0.0-20210813121822-485abfe95c7c/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E=
k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8=
k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y=
k8s.io/klog/v2 v2.30.0 h1:bUO6drIvCIsvZ/XFgfxoGFQU/a4Qkh0iAlvUR7vlHJw=
Expand All @@ -972,3 +974,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.2.0/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZa
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8=
volcano.sh/apis v1.6.0-alpha.0.0.20221012070524-685db38b4fae h1:H7yidKnIq/Y7KmjFP5xFSmE7xL674226D8pEoA/RfG8=
volcano.sh/apis v1.6.0-alpha.0.0.20221012070524-685db38b4fae/go.mod h1:drNMGuHPn1ew7oBSDQb5KRey6tXOQksbUtw3gPxF3Vo=
4 changes: 4 additions & 0 deletions ray-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
k8szap "sigs.k8s.io/controller-runtime/pkg/log/zap"

rayv1alpha1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1alpha1"
vcbetav1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
// +kubebuilder:scaffold:imports
)

Expand All @@ -35,6 +36,7 @@ var (
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(rayv1alpha1.AddToScheme(scheme))
utilruntime.Must(vcbetav1.AddToScheme(scheme))
// +kubebuilder:scaffold:scheme
}

Expand Down Expand Up @@ -63,6 +65,8 @@ func main() {
"Forced cluster upgrade flag")
flag.StringVar(&logFile, "log-file-path", "",
"Synchronize logs to local file")
flag.BoolVar(&ray.EnableBatchScheduler, "enable-batch-scheduler", false,
"Enable batch scheduler. Currently is volcano, which supports gang scheduler policy.")

opts := k8szap.Options{
Development: true,
Expand Down