-
Notifications
You must be signed in to change notification settings - Fork 403
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RayCluster][Fix] Add expectations of RayCluster #2150
base: master
Are you sure you want to change the base?
Changes from 4 commits
10120e3
6d546d4
5010d8a
aa9f9e6
c1261c6
3d7a939
f6232c9
d3dd340
4e5b501
626a2f5
a611ec0
7230db2
d152d34
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,382 @@ | ||
package expectations | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"strconv" | ||
"time" | ||
|
||
corev1 "k8s.io/api/core/v1" | ||
"k8s.io/apimachinery/pkg/api/errors" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/types" | ||
"k8s.io/client-go/tools/cache" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
|
||
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" | ||
) | ||
|
||
var ( | ||
ResourceInitializers map[ExpectedResourceType]func() client.Object | ||
) | ||
|
||
const ( | ||
ExpectationsTimeout = 10 * time.Minute | ||
) | ||
|
||
type ExpectedResourceType string | ||
|
||
const ( | ||
Pod ExpectedResourceType = "Pod" | ||
RayCluster ExpectedResourceType = "RayCluster" | ||
) | ||
|
||
type ActiveExpectationAction int | ||
|
||
const ( | ||
Create ActiveExpectationAction = 0 | ||
Delete ActiveExpectationAction = 1 | ||
Update ActiveExpectationAction = 3 | ||
) | ||
|
||
func init() { | ||
ResourceInitializers = map[ExpectedResourceType]func() client.Object{ | ||
Pod: func() client.Object { | ||
return &corev1.Pod{} | ||
}, | ||
RayCluster: func() client.Object { | ||
return &rayv1.RayCluster{} | ||
}, | ||
} | ||
} | ||
|
||
type ActiveExpectationsInterface interface { | ||
ExpectCreate(key string, kind ExpectedResourceType, namespace, name string) error | ||
ExpectDelete(key string, kind ExpectedResourceType, namespace, name string) error | ||
ExpectUpdate(key string, kind ExpectedResourceType, namespace, name string, updatedResourceVersion string) error | ||
IsSatisfied(key string) (satisfied bool, err error) | ||
DeleteItem(key string, kind ExpectedResourceType, name string) error | ||
Delete(key string) error | ||
} | ||
|
||
func NewActiveExpectations(client client.Client) ActiveExpectationsInterface { | ||
return &ActiveExpectations{ | ||
Client: client, | ||
subjects: cache.NewStore(ExpKeyFunc), | ||
} | ||
} | ||
|
||
type ActiveExpectations struct { | ||
client.Client | ||
subjects cache.Store | ||
} | ||
|
||
func (ae *ActiveExpectations) ExpectCreate(key string, kind ExpectedResourceType, namespace, name string) error { | ||
return ae.expectCreateOrDelete(key, kind, Create, namespace, name) | ||
} | ||
|
||
func (ae *ActiveExpectations) ExpectDelete(key string, kind ExpectedResourceType, namespace, name string) error { | ||
return ae.expectCreateOrDelete(key, kind, Delete, namespace, name) | ||
} | ||
|
||
func (ae *ActiveExpectations) expectCreateOrDelete(key string, kind ExpectedResourceType, action ActiveExpectationAction, namespace, name string) error { | ||
expectation, exist, err := ae.subjects.GetByKey(key) | ||
if err != nil { | ||
return fmt.Errorf("fail to get expectation for active expectations %s when expecting: %s", key, err) | ||
} | ||
|
||
if !exist { | ||
expectation = NewActiveExpectation(ae.Client, key, namespace) | ||
if err = ae.subjects.Add(expectation); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
if err = expectation.(*ActiveExpectation).expectCreateOrDelete(kind, name, action); err != nil { | ||
return fmt.Errorf("fail to expect %s/%s for action %d: %s", kind, name, action, err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (ae *ActiveExpectations) ExpectUpdate(key string, kind ExpectedResourceType, namespace, name string, updatedResourceVersion string) error { | ||
rv, err := strconv.ParseInt(updatedResourceVersion, 10, 64) | ||
if err != nil { | ||
panic(fmt.Sprintf("fail to parse resource version %s of resource %s/%s to int64 for subject %s: %s", | ||
updatedResourceVersion, kind, name, key, err)) | ||
} | ||
|
||
if _, exist := ResourceInitializers[kind]; !exist { | ||
panic(fmt.Sprintf("kind %s is not supported for Active Expectation", kind)) | ||
} | ||
|
||
expectation, exist, err := ae.subjects.GetByKey(key) | ||
if err != nil { | ||
return fmt.Errorf("fail to get expectation for active expectations %s when expecting: %s", key, err) | ||
} | ||
|
||
if !exist { | ||
expectation = NewActiveExpectation(ae.Client, namespace, key) | ||
if err := ae.subjects.Add(expectation); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
if err := expectation.(*ActiveExpectation).expectUpdate(kind, name, rv); err != nil { | ||
return fmt.Errorf("fail to expect %s/%s for action %d, %s: %s", kind, name, Update, updatedResourceVersion, err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (ae *ActiveExpectations) IsSatisfied(key string) (satisfied bool, err error) { | ||
expectation, exist, err := ae.subjects.GetByKey(key) | ||
if err != nil { | ||
return false, fmt.Errorf("fail to get expectation for active expectations %s when check satisfication: %s", key, err) | ||
} | ||
|
||
if !exist { | ||
return true, nil | ||
} | ||
|
||
defer func() { | ||
if satisfied { | ||
ae.subjects.Delete(expectation) | ||
} | ||
}() | ||
|
||
satisfied, err = expectation.(*ActiveExpectation).isSatisfied() | ||
if err != nil { | ||
return false, err | ||
} | ||
|
||
return | ||
} | ||
|
||
func (ae *ActiveExpectations) DeleteItem(key string, kind ExpectedResourceType, name string) error { | ||
if _, exist := ResourceInitializers[kind]; !exist { | ||
panic(fmt.Sprintf("kind %s is not supported for Active Expectation", kind)) | ||
} | ||
|
||
expectation, exist, err := ae.subjects.GetByKey(key) | ||
if err != nil { | ||
return fmt.Errorf("fail to get expectation for active expectations %s when deleting name %s: %s", key, name, err) | ||
} | ||
|
||
if !exist { | ||
return nil | ||
} | ||
|
||
item := expectation.(*ActiveExpectation) | ||
if err := item.delete(string(kind), name); err != nil { | ||
return fmt.Errorf("fail to delete %s/%s for key %s: %s", kind, name, key, err) | ||
} | ||
|
||
if len(item.items.List()) == 0 { | ||
ae.subjects.Delete(expectation) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (ae *ActiveExpectations) Delete(key string) error { | ||
expectation, exist, err := ae.subjects.GetByKey(key) | ||
if err != nil { | ||
return fmt.Errorf("fail to get expectation for active expectations %s when deleting: %s", key, err) | ||
} | ||
|
||
if !exist { | ||
return nil | ||
} | ||
|
||
err = ae.subjects.Delete(expectation) | ||
if err != nil { | ||
return fmt.Errorf("fail to do delete expectation for active expectations %s when deleting: %s", key, err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func ActiveExpectationItemKeyFunc(object interface{}) (string, error) { | ||
expectationItem, ok := object.(*ActiveExpectationItem) | ||
if !ok { | ||
return "", fmt.Errorf("fail to convert to active expectation item") | ||
} | ||
return expectationItem.Key, nil | ||
} | ||
|
||
func NewActiveExpectation(client client.Client, key, namespace string) *ActiveExpectation { | ||
return &ActiveExpectation{ | ||
Client: client, | ||
namespace: namespace, | ||
key: key, | ||
items: cache.NewStore(ActiveExpectationItemKeyFunc), | ||
recordTimestamp: time.Now(), | ||
} | ||
} | ||
|
||
type ActiveExpectation struct { | ||
client.Client | ||
namespace string | ||
key string | ||
items cache.Store | ||
recordTimestamp time.Time | ||
} | ||
|
||
func (ae *ActiveExpectation) expectCreateOrDelete(kind ExpectedResourceType, name string, action ActiveExpectationAction) error { | ||
|
||
key := fmt.Sprintf("%s/%s", kind, name) | ||
item, exist, err := ae.items.GetByKey(key) | ||
if err != nil { | ||
return fmt.Errorf("fail to get active expectation item for %s when expecting: %s", key, err) | ||
} | ||
|
||
ae.recordTimestamp = time.Now() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we use a mutex for updating the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Thanks for your review. 😺 The use of a mutex for Whether the As a result, only one goroutine handles the However, this can cause issues if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
if !exist { | ||
item = &ActiveExpectationItem{Client: ae.Client, Name: name, Kind: kind, Key: key, Action: action, RecordTimestamp: time.Now()} | ||
if err := ae.items.Add(item); err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
item.(*ActiveExpectationItem).Action = action | ||
item.(*ActiveExpectationItem).RecordTimestamp = time.Now() | ||
return nil | ||
} | ||
|
||
func (ae *ActiveExpectation) expectUpdate(kind ExpectedResourceType, name string, resourceVersion int64) error { | ||
key := fmt.Sprintf("%s/%s", kind, name) | ||
item, exist, err := ae.items.GetByKey(key) | ||
if err != nil { | ||
return fmt.Errorf("fail to get active expectation item for %s when expecting: %s", key, err) | ||
} | ||
|
||
ae.recordTimestamp = time.Now() | ||
if !exist { | ||
item = &ActiveExpectationItem{Client: ae.Client, Name: name, Kind: kind, Key: key, Action: Update, ResourceVersion: resourceVersion, RecordTimestamp: time.Now()} | ||
if err := ae.items.Add(item); err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
ea := item.(*ActiveExpectationItem) | ||
ea.Action = Update | ||
ea.ResourceVersion = resourceVersion | ||
ea.RecordTimestamp = time.Now() | ||
|
||
return nil | ||
} | ||
|
||
func (ae *ActiveExpectation) isSatisfied() (satisfied bool, err error) { | ||
items := ae.items.List() | ||
|
||
satisfied = true | ||
for _, item := range items { | ||
itemSatisfied, itemErr := func() (satisfied bool, err error) { | ||
defer func() { | ||
if satisfied { | ||
ae.items.Delete(item) | ||
} else if ae.recordTimestamp.Add(ExpectationsTimeout).Before(time.Now()) { | ||
panic("expected panic for active expectation") | ||
} | ||
}() | ||
|
||
satisfied, err = item.(*ActiveExpectationItem).isSatisfied(ae.namespace) | ||
if err != nil { | ||
return false, err | ||
} | ||
|
||
return satisfied, nil | ||
}() | ||
|
||
if itemErr != nil && err == nil { | ||
err = fmt.Errorf("fail to check satisfication for subject %s, item %s: %s", ae.key, item.(*ActiveExpectationItem).Key, itemErr) | ||
} | ||
|
||
satisfied = satisfied && itemSatisfied | ||
} | ||
|
||
return satisfied, err | ||
} | ||
|
||
func (ae *ActiveExpectation) delete(kind, name string) error { | ||
key := fmt.Sprintf("%s/%s", kind, name) | ||
item, exist, err := ae.items.GetByKey(key) | ||
if err != nil { | ||
return fmt.Errorf("fail to delete active expectation item for %s: %s", key, err) | ||
} | ||
|
||
if !exist { | ||
return nil | ||
} | ||
|
||
if err := ae.items.Delete(item); err != nil { | ||
return fmt.Errorf("fail to do delete active expectation item for %s: %s", key, err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
type ActiveExpectationItem struct { | ||
client.Client | ||
|
||
Key string | ||
Name string | ||
Kind ExpectedResourceType | ||
Action ActiveExpectationAction | ||
ResourceVersion int64 | ||
RecordTimestamp time.Time | ||
} | ||
|
||
func (i *ActiveExpectationItem) isSatisfied(namespace string) (bool, error) { | ||
Eikykun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
switch i.Action { | ||
case Create: | ||
resource := ResourceInitializers[i.Kind]() | ||
if err := i.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: i.Name}, resource); err == nil { | ||
return true, nil | ||
} else if errors.IsNotFound(err) && i.RecordTimestamp.Add(30*time.Second).Before(time.Now()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does this mean? Do you mean: (1) The Pod is not found in the informer cache. I can't understand (2). If we sent a request 30 seconds ago and the informer still hasn't received information about the Pod, there are two possibilities:
For case (a), it is OK for the function to say that the expectation is satisfied. However, for case (b), what will happen if the creation fails and we tell the KubeRay operator it is satisfied? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Case(b) may not occur here? Because only after the pod is successfully created, the pod is expected to be in the cache. if err := r.Create(ctx, &pod); err != nil {
return err
}
rayClusterExpectation.ExpectCreateHeadPod(key, pod.Namespace, pod.Name) |
||
// tolerate watch event missing, after 30s | ||
return true, nil | ||
} | ||
case Delete: | ||
resource := ResourceInitializers[i.Kind]() | ||
if err := i.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: i.Name}, resource); err != nil { | ||
if errors.IsNotFound(err) { | ||
return true, nil | ||
} | ||
} else { | ||
Eikykun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if resource.(metav1.Object).GetDeletionTimestamp() != nil { | ||
return true, nil | ||
} | ||
} | ||
case Update: | ||
Eikykun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
resource := ResourceInitializers[i.Kind]() | ||
if err := i.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: i.Name}, resource); err == nil { | ||
rv, err := strconv.ParseInt(resource.(metav1.Object).GetResourceVersion(), 10, 64) | ||
if err != nil { | ||
// true for error | ||
return true, nil | ||
} | ||
if rv >= i.ResourceVersion { | ||
return true, nil | ||
} | ||
} else { | ||
Eikykun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if errors.IsNotFound(err) { | ||
return true, nil | ||
} | ||
} | ||
} | ||
return false, nil | ||
} | ||
|
||
// ExpKeyFunc to parse out the key from a ControlleeExpectation | ||
var ExpKeyFunc = func(obj interface{}) (string, error) { | ||
if e, ok := obj.(*ActiveExpectation); ok { | ||
return e.key, nil | ||
} | ||
panic(fmt.Sprintf("Could not find key for obj %#v", obj)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are many read-after-write operations in the
ActiveExpectations
. Should we use a mutex to wrap these operations? For example, will the aboveae.subjects.Delete(expectation)
delete an unsatisfied expectation?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The subjects of
ActiveExpectations
areThreadSafeStore
, utilizingstore
provided by k8s.io/client-go/tools/cache. Therefore, operations onActiveExpectations.subjects
are thread-safe.For
ActiveExpectations.subjects
items, theActiveExpectation
also utilizes aThreadSafeStore
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍