Skip to content

Commit

Permalink
bugfix: list runtimeclass and csidriver error from cache when cloud-e…
Browse files Browse the repository at this point in the history
…dge network disconnected
  • Loading branch information
rambohe-ch committed Apr 19, 2021
1 parent c7c6023 commit 019e115
Show file tree
Hide file tree
Showing 7 changed files with 793 additions and 471 deletions.
6 changes: 6 additions & 0 deletions pkg/yurthub/cachemanager/cache_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,12 @@ func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.Req
accessor := meta.NewAccessor()

comp, _ := util.ClientComponentFrom(ctx)
// list return no objects, create the key only.
if len(items) == 0 {
key, _ := util.KeyFunc(comp, info.Resource, info.Namespace, "")
return cm.storage.Create(key, nil)
}

var errs []error
for i := range items {
name, err := accessor.Name(items[i])
Expand Down
73 changes: 70 additions & 3 deletions pkg/yurthub/cachemanager/cache_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
proxyutil "github.com/openyurtio/openyurt/pkg/yurthub/proxy/util"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
v1 "k8s.io/api/core/v1"
nodev1beta1 "k8s.io/api/node/v1beta1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -642,6 +643,71 @@ func TestCacheResponseForList(t *testing.T) {
},
},
},
{
desc: "cache response for list nodes with fieldselector",
group: "",
version: "v1",
key: "kubelet/nodes",
inputObj: runtime.Object(
&v1.NodeList{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "NodeList",
},
ListMeta: metav1.ListMeta{
ResourceVersion: "12",
},
Items: []v1.Node{
{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Node",
},
ObjectMeta: metav1.ObjectMeta{
Name: "mynode",
ResourceVersion: "12",
},
},
},
},
),
userAgent: "kubelet",
accept: "application/json",
verb: "GET",
path: "/api/v1/nodes?fieldselector=meatadata.name=mynode",
namespaced: false,
expectResult: expectData{
data: map[string]struct{}{
"node-mynode-12": {},
},
},
},
{
desc: "cache response for list runtimeclasses with no objects",
group: "node.k8s.io",
version: "v1beta1",
key: "kubelet/runtimeclass",
inputObj: runtime.Object(
&nodev1beta1.RuntimeClassList{
TypeMeta: metav1.TypeMeta{
APIVersion: "node.k8s.io/v1beta1",
Kind: "RuntimeClassList",
},
ListMeta: metav1.ListMeta{
ResourceVersion: "12",
},
Items: []nodev1beta1.RuntimeClass{},
},
),
userAgent: "kubelet",
accept: "application/json",
verb: "GET",
path: "/apis/node.k8s.io/v1beta1/runtimeclasses",
namespaced: false,
expectResult: expectData{
data: map[string]struct{}{},
},
},
}

accessor := meta.NewAccessor()
Expand Down Expand Up @@ -683,16 +749,16 @@ func TestCacheResponseForList(t *testing.T) {

if tt.expectResult.err {
if err == nil {
t.Errorf("Got no error, but expect err")
t.Error("Got no error, but expect err")
}
} else {
if err != nil {
t.Errorf("Got error %v", err)
}

objs, err := storage.List(tt.key)
if err != nil || len(objs) == 0 {
t.Errorf("failed to get object from storage")
if err != nil {
t.Errorf("failed to list objects from storage, %v", err)
}

if len(objs) != len(tt.expectResult.data) {
Expand All @@ -717,6 +783,7 @@ func TestCacheResponseForList(t *testing.T) {
}
}
}
resetStorage(storage, tt.key)
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/yurthub/cachemanager/fake_storage_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (fsw *fakeStorageWrapper) ListKeys(key string) ([]string, error) {
func (fsw *fakeStorageWrapper) List(key string) ([]runtime.Object, error) {
objs := make([]runtime.Object, 0, len(fsw.data))
for k, obj := range fsw.data {
if strings.HasPrefix(k, key) {
if strings.HasPrefix(k, key) && obj != nil {
objs = append(objs, obj)
}
}
Expand Down
27 changes: 23 additions & 4 deletions pkg/yurthub/cachemanager/storage_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,23 @@ func NewStorageWrapper(storage storage.Store) StorageWrapper {
}

// Create store runtime object into backend storage
// if obj is nil, the storage used to represent the key
// will be created. for example: for disk storage,
// a directory that indicates the key will be created.
func (sw *storageWrapper) Create(key string, obj runtime.Object) error {
var buf bytes.Buffer
if err := sw.backendSerializer.Encode(obj, &buf); err != nil {
klog.Errorf("failed to encode object in create for %s, %v", key, err)
return err
if obj != nil {
if err := sw.backendSerializer.Encode(obj, &buf); err != nil {
klog.Errorf("failed to encode object in create for %s, %v", key, err)
return err
}
}

if err := sw.store.Create(key, buf.Bytes()); err != nil {
return err
}

if isCacheKey(key) {
if obj != nil && isCacheKey(key) {
sw.Lock()
sw.cache[key] = obj
sw.Unlock()
Expand Down Expand Up @@ -141,6 +146,14 @@ func (sw *storageWrapper) List(key string) ([]runtime.Object, error) {
klog.Errorf("could not list objects for %s, %v", key, err)
return nil, err
} else if len(bb) == 0 {
if isPodKey(key) {
// because at least there will be yurt-hub pod on the node.
// if no pods in cache, maybe all of pods have been deleted by accident,
// if empty object is returned, pods on node will be deleted by kubelet.
// in order to prevent the influence to business, return error here so pods
// will be kept on node.
return objects, storage.ErrStorageNotFound
}
return objects, nil
}

Expand Down Expand Up @@ -200,3 +213,9 @@ func isCacheKey(key string) bool {

return false
}

// isPodKey verify the key is kubelet/pods or not
func isPodKey(key string) bool {
comp, resource, _, _ := util.SplitKey(key)
return comp == "kubelet" && resource == "pods"
}
Loading

0 comments on commit 019e115

Please sign in to comment.