Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RayCluster][Fix] Add expectations of RayCluster #2150

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
152 changes: 152 additions & 0 deletions ray-operator/controllers/ray/expectations/scale_expectation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package expectations

import (
"context"
"fmt"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const DefaultHead = ""

// ScaleAction is the action of scale, like create and delete.
type ScaleAction string

const (
// Create action
Create ScaleAction = "create"
// Delete action
Delete ScaleAction = "delete"
)

const (
// GroupIndex indexes pods within the specified group in RayCluster
GroupIndex = "group"
// RayClusterIndex indexes pods within the RayCluster
RayClusterIndex = "raycluster"
)

// RayClusterScaleExpectation is an interface that to set and wait on expectations of RayCluster groups scale.
type RayClusterScaleExpectation interface {
ExpectScalePod(rayClusterName, group, namespace, name string, action ScaleAction)
IsSatisfied(ctx context.Context, rayClusterName, group, namespace string) bool
Delete(rayClusterName, namespace string)
}

func newRayPodIndexer() cache.Indexer {
return cache.NewIndexer(rayPodKey, cache.Indexers{GroupIndex: groupIndexFunc, RayClusterIndex: rayClusterIndexFunc})
}

func NewRayClusterScaleExpectation(client client.Client) RayClusterScaleExpectation {
return &realRayClusterScaleExpectation{
Client: client,
itemsCache: newRayPodIndexer(),
}
}

type realRayClusterScaleExpectation struct {
client.Client
itemsCache cache.Indexer
}

func (r *realRayClusterScaleExpectation) ExpectScalePod(rayClusterName, group, namespace, name string, action ScaleAction) {
_ = r.itemsCache.Add(&rayPod{
name: name,
namespace: namespace,
group: group,
rayCluster: rayClusterName,
action: action,
recordTimestamp: time.Now(),
})
}

func (r *realRayClusterScaleExpectation) IsSatisfied(ctx context.Context, rayClusterName, group, namespace string) (isSatisfied bool) {
items, _ := r.itemsCache.ByIndex(GroupIndex, fmt.Sprintf("%s/%s/%s", namespace, rayClusterName, group))
isSatisfied = true
for i := range items {
rp := items[i].(*rayPod)
pod := &corev1.Pod{}
isPodSatisfied := false
switch rp.action {
case Create:
if err := r.Get(ctx, types.NamespacedName{Name: rp.name, Namespace: namespace}, pod); err == nil {
isPodSatisfied = true
} else {
isPodSatisfied = errors.IsNotFound(err) && rp.recordTimestamp.Add(30*time.Second).Before(time.Now())
}
case Delete:
if err := r.Get(ctx, types.NamespacedName{Name: rp.name, Namespace: namespace}, pod); err != nil {
isPodSatisfied = errors.IsNotFound(err)
} else {
isPodSatisfied = pod.DeletionTimestamp != nil
}
}
// delete satisfied item in cache
if isPodSatisfied {
_ = r.itemsCache.Delete(items[i])
} else {
isSatisfied = false
}
}
return isSatisfied
}

func (r *realRayClusterScaleExpectation) Delete(rayClusterName, namespace string) {
items, _ := r.itemsCache.ByIndex(RayClusterIndex, fmt.Sprintf("%s/%s", namespace, rayClusterName))
for _, item := range items {
_ = r.itemsCache.Delete(item)
}
}

type rayPod struct {
recordTimestamp time.Time
action ScaleAction
name string
namespace string
rayCluster string
group string
}

func (p *rayPod) Key() string {
return fmt.Sprintf("%s/%s", p.namespace, p.name)
}

func (p *rayPod) GroupKey() string {
return fmt.Sprintf("%s/%s/%s", p.namespace, p.rayCluster, p.group)
}

func (p *rayPod) ClusterKey() string {
return fmt.Sprintf("%s/%s", p.namespace, p.rayCluster)
}

func rayPodKey(obj interface{}) (string, error) {
return obj.(*rayPod).Key(), nil
}

func groupIndexFunc(obj interface{}) ([]string, error) {
return []string{obj.(*rayPod).GroupKey()}, nil
}

func rayClusterIndexFunc(obj interface{}) ([]string, error) {
return []string{obj.(*rayPod).ClusterKey()}, nil
}

func NewFakeRayClusterScaleExpectation() RayClusterScaleExpectation {
return &fakeRayClusterScaleExpectation{}
}

type fakeRayClusterScaleExpectation struct{}

func (r *fakeRayClusterScaleExpectation) ExpectScalePod(_, _, _, _ string, _ ScaleAction) {
}

func (r *fakeRayClusterScaleExpectation) IsSatisfied(_ context.Context, _, _, _ string) bool {
return true
}

func (r *fakeRayClusterScaleExpectation) Delete(_, _ string) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package expectations

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake"
)

func TestRayClusterExpectations(t *testing.T) {
setupTest()
ctx := context.TODO()
fakeClient := clientFake.NewClientBuilder().WithRuntimeObjects().Build()
exp := NewRayClusterScaleExpectation(fakeClient)
Comment on lines +17 to +18
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t quite understand why our design needs to use a client and actually create an object. Take scale_expectations_test.go as an example: you can see that the test covers very simple things without involving any k8s operations. I believe our usage is almost identical, so we shouldn’t need to use the k8s client either.

https://github.com/openkruise/kruise/blob/c426ed9b1ea3565b1a18e2adba628a2d9b956d91/pkg/util/expectations/scale_expectations_test.go

Copy link
Author

@Eikykun Eikykun Nov 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fundamental reason for adding a client was that we used GenerateName when creating a Pod, rather than knowing the PodName beforehand like with CloneSet. The ObserveScale() method relies on the PodName specified in ExpectScale().
https://github.com/openkruise/kruise/blob/c426ed9b1ea3565b1a18e2adba628a2d9b956d91/pkg/controller/cloneset/sync/cloneset_scale.go#L205
https://github.com/openkruise/kruise/blob/c426ed9b1ea3565b1a18e2adba628a2d9b956d91/pkg/controller/cloneset/cloneset_event_handler.go#L75

In the design of CloneSet's Expectations, the call to ScaleExpectations() must occur before Create or Delete a Pod (if ScaleExpectations() is called after Create or Delete, it may result in ObserveScale() being invoked in podEventHandler before ScaleExpectations()). Therefore, with GenerateName, we cannot obtain the PodName before Create. This requires modifying the Expectation design to allow for calling ScaleExpectations() after creating the Pod.

Similar issues can be observed in the design of ReplicaSet, which utilize UIDTrackingControllerExpectations to track pods, but currently do not address UID tracking during pod creation. Introducing expectations similar to those in ReplicaSets would complicate the code.
https://github.com/kubernetes/kubernetes/blob/master/pkg/controller/replicaset/replica_set.go#L582-L587
https://github.com/kubernetes/kubernetes/blob/master/pkg/controller/replicaset/replica_set.go#L632-L638

The addition of a client leads to simpler code implementation, as we no longer need to handle complex ObserveScale logic in the podEventHandler. We simply check if the pod in the informer meets the Expectation set during the last reconcile before proceeding with reconciliation and releasing the satisfied Expectation.
The concern here might be whether the client becomes a performance bottleneck. In the code, if there was a Create or Delete of a Pod in the last reconcile, the next reconcile will call client.Get() to check if the created or deleted Pod meets the conditions, and delete the Expectation upon success. Under low watch latency, the number of client calls is roughly in line with the number of successful pod Create/Delete. In cases of high watch latency, repeated checks may occur more frequently. However, our current practical validation shows that it does not cause a significant performance bottleneck.

namespace := "default"
rayClusterName := "raycluster-test-pod"

// Test expect create head
exp.ExpectScalePod(rayClusterName, DefaultHead, namespace, testPods[0].(*corev1.Pod).Name, Create)
assert.Equal(t, len(exp.(*realRayClusterScaleExpectation).itemsCache.List()), 1)
assert.Equal(t, exp.IsSatisfied(ctx, rayClusterName, DefaultHead, namespace), false)
err := fakeClient.Create(context.TODO(), testPods[0].(*corev1.Pod))
assert.Nil(t, err, "Fail to create head pod")
assert.Equal(t, exp.IsSatisfied(ctx, rayClusterName, DefaultHead, namespace), true)
// delete satisfied item in cache
assert.Equal(t, len(exp.(*realRayClusterScaleExpectation).itemsCache.List()), 0)

// Test expect delete head
exp.ExpectScalePod(rayClusterName, DefaultHead, namespace, testPods[0].(*corev1.Pod).Name, Delete)
assert.Equal(t, exp.IsSatisfied(ctx, rayClusterName, DefaultHead, namespace), false)
// delete pod
err = fakeClient.Delete(context.TODO(), testPods[0].(*corev1.Pod))
assert.Nil(t, err, "Fail to delete head pod")
assert.Equal(t, exp.IsSatisfied(ctx, rayClusterName, DefaultHead, namespace), true)

// Test expect create worker
group := "test-group"
exp.ExpectScalePod(rayClusterName, group, namespace, testPods[1].(*corev1.Pod).Name, Create)
exp.ExpectScalePod(rayClusterName, group, namespace, testPods[2].(*corev1.Pod).Name, Create)
assert.Equal(t, exp.IsSatisfied(ctx, rayClusterName, group, namespace), false)
assert.Nil(t, fakeClient.Create(context.TODO(), testPods[1].(*corev1.Pod)), "Fail to create worker pod1")
assert.Equal(t, exp.IsSatisfied(ctx, rayClusterName, group, namespace), false)
assert.Nil(t, fakeClient.Create(context.TODO(), testPods[2].(*corev1.Pod)), "Fail to create worker pod2")
assert.Equal(t, exp.IsSatisfied(ctx, rayClusterName, group, namespace), true)

// Test delete all
// reset pods
setupTest()
exp.ExpectScalePod(rayClusterName, DefaultHead, namespace, testPods[0].(*corev1.Pod).Name, Create)
exp.ExpectScalePod(rayClusterName, group, namespace, testPods[1].(*corev1.Pod).Name, Delete)
exp.ExpectScalePod(rayClusterName, group, namespace, testPods[2].(*corev1.Pod).Name, Delete)
assert.Equal(t, exp.IsSatisfied(ctx, rayClusterName, DefaultHead, namespace), false)
assert.Equal(t, exp.IsSatisfied(ctx, rayClusterName, group, namespace), false)
exp.Delete(rayClusterName, namespace)
assert.Equal(t, exp.IsSatisfied(ctx, rayClusterName, DefaultHead, namespace), true)
assert.Equal(t, exp.IsSatisfied(ctx, rayClusterName, group, namespace), true)
assert.Equal(t, len(exp.(*realRayClusterScaleExpectation).itemsCache.List()), 0)
}

var testPods []runtime.Object

func setupTest() {
testPods = []runtime.Object{
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Namespace: "default",
},
},
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod2",
Namespace: "default",
},
},
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod3",
Namespace: "default",
},
},
}
}
Loading
Loading