Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RayCluster][Fix] Add expectations of RayCluster #2150

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
382 changes: 382 additions & 0 deletions ray-operator/controllers/ray/expectations/active_expectation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,382 @@
package expectations

import (
"context"
"fmt"
"strconv"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
)

var (
ResourceInitializers map[ExpectedResourceType]func() client.Object
)

const (
ExpectationsTimeout = 10 * time.Minute
)

type ExpectedResourceType string

const (
Pod ExpectedResourceType = "Pod"
RayCluster ExpectedResourceType = "RayCluster"
)

type ActiveExpectationAction int

const (
Create ActiveExpectationAction = 0
Delete ActiveExpectationAction = 1
Update ActiveExpectationAction = 3
)

func init() {
ResourceInitializers = map[ExpectedResourceType]func() client.Object{
Pod: func() client.Object {
return &corev1.Pod{}
},
RayCluster: func() client.Object {
return &rayv1.RayCluster{}
},
}
}

type ActiveExpectationsInterface interface {
ExpectCreate(key string, kind ExpectedResourceType, namespace, name string) error
ExpectDelete(key string, kind ExpectedResourceType, namespace, name string) error
ExpectUpdate(key string, kind ExpectedResourceType, namespace, name string, updatedResourceVersion string) error
IsSatisfied(key string) (satisfied bool, err error)
DeleteItem(key string, kind ExpectedResourceType, name string) error
Delete(key string) error
}

func NewActiveExpectations(client client.Client) ActiveExpectationsInterface {
return &ActiveExpectations{
Client: client,
subjects: cache.NewStore(ExpKeyFunc),
}
}

type ActiveExpectations struct {
client.Client
subjects cache.Store
}

func (ae *ActiveExpectations) ExpectCreate(key string, kind ExpectedResourceType, namespace, name string) error {
return ae.expectCreateOrDelete(key, kind, Create, namespace, name)
}

func (ae *ActiveExpectations) ExpectDelete(key string, kind ExpectedResourceType, namespace, name string) error {
return ae.expectCreateOrDelete(key, kind, Delete, namespace, name)
}

func (ae *ActiveExpectations) expectCreateOrDelete(key string, kind ExpectedResourceType, action ActiveExpectationAction, namespace, name string) error {
expectation, exist, err := ae.subjects.GetByKey(key)
if err != nil {
return fmt.Errorf("fail to get expectation for active expectations %s when expecting: %s", key, err)
}

if !exist {
expectation = NewActiveExpectation(ae.Client, key, namespace)
if err = ae.subjects.Add(expectation); err != nil {
return err
}
}

if err = expectation.(*ActiveExpectation).expectCreateOrDelete(kind, name, action); err != nil {
return fmt.Errorf("fail to expect %s/%s for action %d: %s", kind, name, action, err)
}

return nil
}

func (ae *ActiveExpectations) ExpectUpdate(key string, kind ExpectedResourceType, namespace, name string, updatedResourceVersion string) error {
rv, err := strconv.ParseInt(updatedResourceVersion, 10, 64)
if err != nil {
panic(fmt.Sprintf("fail to parse resource version %s of resource %s/%s to int64 for subject %s: %s",
updatedResourceVersion, kind, name, key, err))
}

if _, exist := ResourceInitializers[kind]; !exist {
panic(fmt.Sprintf("kind %s is not supported for Active Expectation", kind))
}

expectation, exist, err := ae.subjects.GetByKey(key)
if err != nil {
return fmt.Errorf("fail to get expectation for active expectations %s when expecting: %s", key, err)
}

if !exist {
expectation = NewActiveExpectation(ae.Client, namespace, key)
if err := ae.subjects.Add(expectation); err != nil {
return err
}
}

if err := expectation.(*ActiveExpectation).expectUpdate(kind, name, rv); err != nil {
return fmt.Errorf("fail to expect %s/%s for action %d, %s: %s", kind, name, Update, updatedResourceVersion, err)
}

return nil
}

func (ae *ActiveExpectations) IsSatisfied(key string) (satisfied bool, err error) {
expectation, exist, err := ae.subjects.GetByKey(key)
if err != nil {
return false, fmt.Errorf("fail to get expectation for active expectations %s when check satisfication: %s", key, err)
}

if !exist {
return true, nil
}

defer func() {
if satisfied {
ae.subjects.Delete(expectation)
}
}()

satisfied, err = expectation.(*ActiveExpectation).isSatisfied()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are many read-after-write operations in the ActiveExpectations. Should we use a mutex to wrap these operations? For example, will the above ae.subjects.Delete(expectation) delete an unsatisfied expectation?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The subjects of ActiveExpectations are ThreadSafeStore, utilizing store provided by k8s.io/client-go/tools/cache. Therefore, operations on ActiveExpectations.subjects are thread-safe.
For ActiveExpectations.subjects items, the ActiveExpectation also utilizes a ThreadSafeStore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if err != nil {
return false, err
}

return
}

func (ae *ActiveExpectations) DeleteItem(key string, kind ExpectedResourceType, name string) error {
if _, exist := ResourceInitializers[kind]; !exist {
panic(fmt.Sprintf("kind %s is not supported for Active Expectation", kind))
}

expectation, exist, err := ae.subjects.GetByKey(key)
if err != nil {
return fmt.Errorf("fail to get expectation for active expectations %s when deleting name %s: %s", key, name, err)
}

if !exist {
return nil
}

item := expectation.(*ActiveExpectation)
if err := item.delete(string(kind), name); err != nil {
return fmt.Errorf("fail to delete %s/%s for key %s: %s", kind, name, key, err)
}

if len(item.items.List()) == 0 {
ae.subjects.Delete(expectation)
}

return nil
}

func (ae *ActiveExpectations) Delete(key string) error {
expectation, exist, err := ae.subjects.GetByKey(key)
if err != nil {
return fmt.Errorf("fail to get expectation for active expectations %s when deleting: %s", key, err)
}

if !exist {
return nil
}

err = ae.subjects.Delete(expectation)
if err != nil {
return fmt.Errorf("fail to do delete expectation for active expectations %s when deleting: %s", key, err)
}

return nil
}

func ActiveExpectationItemKeyFunc(object interface{}) (string, error) {
expectationItem, ok := object.(*ActiveExpectationItem)
if !ok {
return "", fmt.Errorf("fail to convert to active expectation item")
}
return expectationItem.Key, nil
}

func NewActiveExpectation(client client.Client, key, namespace string) *ActiveExpectation {
return &ActiveExpectation{
Client: client,
namespace: namespace,
key: key,
items: cache.NewStore(ActiveExpectationItemKeyFunc),
recordTimestamp: time.Now(),
}
}

type ActiveExpectation struct {
client.Client
namespace string
key string
items cache.Store
recordTimestamp time.Time
}

func (ae *ActiveExpectation) expectCreateOrDelete(kind ExpectedResourceType, name string, action ActiveExpectationAction) error {

key := fmt.Sprintf("%s/%s", kind, name)
item, exist, err := ae.items.GetByKey(key)
if err != nil {
return fmt.Errorf("fail to get active expectation item for %s when expecting: %s", key, err)
}

ae.recordTimestamp = time.Now()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use a mutex for updating the recordTimestamp?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use a mutex for updating the recordTimestamp?

Thanks for your review. 😺

The use of a mutex for ActiveExpectation's recordTimestamp depends on the context of the ActiveExpectations. Currently, it is only employed in the controller's Reconcile func. Within the controller, multiple workers reconcile concurrently, but only one worker handles a given reconcile.Request at any given time.

Whether the recordTimestamp is used with a mutex in ActiveExpectation depends on its usage context. It is currently only used within the Reconcile func in the controller, where multiple workers are running in parallel. However, for the same ReconcileRequest, only a single worker handles it at any given time.

As a result, only one goroutine handles the ActiveExpectations associated with the same RayCluster, ensuring there won't be any concurrency issues with reading and writing.

However, this can cause issues if ActiveExpectations are used externally by something like an EventHandler. But we aren't seeing this use case currently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if !exist {
item = &ActiveExpectationItem{Client: ae.Client, Name: name, Kind: kind, Key: key, Action: action, RecordTimestamp: time.Now()}
if err := ae.items.Add(item); err != nil {
return err
}

return nil
}

item.(*ActiveExpectationItem).Action = action
item.(*ActiveExpectationItem).RecordTimestamp = time.Now()
return nil
}

func (ae *ActiveExpectation) expectUpdate(kind ExpectedResourceType, name string, resourceVersion int64) error {
key := fmt.Sprintf("%s/%s", kind, name)
item, exist, err := ae.items.GetByKey(key)
if err != nil {
return fmt.Errorf("fail to get active expectation item for %s when expecting: %s", key, err)
}

ae.recordTimestamp = time.Now()
if !exist {
item = &ActiveExpectationItem{Client: ae.Client, Name: name, Kind: kind, Key: key, Action: Update, ResourceVersion: resourceVersion, RecordTimestamp: time.Now()}
if err := ae.items.Add(item); err != nil {
return err
}

return nil
}

ea := item.(*ActiveExpectationItem)
ea.Action = Update
ea.ResourceVersion = resourceVersion
ea.RecordTimestamp = time.Now()

return nil
}

func (ae *ActiveExpectation) isSatisfied() (satisfied bool, err error) {
items := ae.items.List()

satisfied = true
for _, item := range items {
itemSatisfied, itemErr := func() (satisfied bool, err error) {
defer func() {
if satisfied {
ae.items.Delete(item)
} else if ae.recordTimestamp.Add(ExpectationsTimeout).Before(time.Now()) {
panic("expected panic for active expectation")
}
}()

satisfied, err = item.(*ActiveExpectationItem).isSatisfied(ae.namespace)
if err != nil {
return false, err
}

return satisfied, nil
}()

if itemErr != nil && err == nil {
err = fmt.Errorf("fail to check satisfication for subject %s, item %s: %s", ae.key, item.(*ActiveExpectationItem).Key, itemErr)
}

satisfied = satisfied && itemSatisfied
}

return satisfied, err
}

func (ae *ActiveExpectation) delete(kind, name string) error {
key := fmt.Sprintf("%s/%s", kind, name)
item, exist, err := ae.items.GetByKey(key)
if err != nil {
return fmt.Errorf("fail to delete active expectation item for %s: %s", key, err)
}

if !exist {
return nil
}

if err := ae.items.Delete(item); err != nil {
return fmt.Errorf("fail to do delete active expectation item for %s: %s", key, err)
}

return nil
}

type ActiveExpectationItem struct {
client.Client

Key string
Name string
Kind ExpectedResourceType
Action ActiveExpectationAction
ResourceVersion int64
RecordTimestamp time.Time
}

func (i *ActiveExpectationItem) isSatisfied(namespace string) (bool, error) {
Eikykun marked this conversation as resolved.
Show resolved Hide resolved
switch i.Action {
case Create:
resource := ResourceInitializers[i.Kind]()
if err := i.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: i.Name}, resource); err == nil {
return true, nil
} else if errors.IsNotFound(err) && i.RecordTimestamp.Add(30*time.Second).Before(time.Now()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean? Do you mean:

(1) The Pod is not found in the informer cache.
(2) KubeRay has already submitted a Create request to the K8s API server at t=RecordTimestamp. If the Create request was made more than 30 seconds ago, we assume it satisfies the expectation.

I can't understand (2). If we sent a request 30 seconds ago and the informer still hasn't received information about the Pod, there are two possibilities:

  • (a) There are delays between the K8s API server and the informer cache.
  • (b) The creation failed.

For case (a), it is OK for the function to say that the expectation is satisfied. However, for case (b), what will happen if the creation fails and we tell the KubeRay operator it is satisfied?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Case(b) may not occur here? Because only after the pod is successfully created, the pod is expected to be in the cache.

if err := r.Create(ctx, &pod); err != nil {
	return err
}
rayClusterExpectation.ExpectCreateHeadPod(key, pod.Namespace, pod.Name)

// tolerate watch event missing, after 30s
return true, nil
}
case Delete:
resource := ResourceInitializers[i.Kind]()
if err := i.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: i.Name}, resource); err != nil {
if errors.IsNotFound(err) {
return true, nil
}
} else {
Eikykun marked this conversation as resolved.
Show resolved Hide resolved
if resource.(metav1.Object).GetDeletionTimestamp() != nil {
return true, nil
}
}
case Update:
Eikykun marked this conversation as resolved.
Show resolved Hide resolved
resource := ResourceInitializers[i.Kind]()
if err := i.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: i.Name}, resource); err == nil {
rv, err := strconv.ParseInt(resource.(metav1.Object).GetResourceVersion(), 10, 64)
if err != nil {
// true for error
return true, nil
}
if rv >= i.ResourceVersion {
return true, nil
}
} else {
Eikykun marked this conversation as resolved.
Show resolved Hide resolved
if errors.IsNotFound(err) {
return true, nil
}
}
}
return false, nil
}

// ExpKeyFunc to parse out the key from a ControlleeExpectation
var ExpKeyFunc = func(obj interface{}) (string, error) {
if e, ok := obj.(*ActiveExpectation); ok {
return e.key, nil
}
panic(fmt.Sprintf("Could not find key for obj %#v", obj))
}
Loading