From 42284622098012dcd3b10eedd4e0bbedb295f43b Mon Sep 17 00:00:00 2001 From: chentao <421224811@qq.com> Date: Tue, 16 Mar 2021 18:15:40 +0800 Subject: [PATCH] Enhance Yurthub caching ability --- pkg/yurthub/cachemanager/cache_manager.go | 100 +-- .../cachemanager/cache_manager_test.go | 653 +++++++++++++++++- pkg/yurthub/cachemanager/storage_wrapper.go | 33 +- .../kubernetes/serializer/serializer.go | 198 +++++- pkg/yurthub/proxy/local/local_test.go | 24 +- 5 files changed, 902 insertions(+), 106 deletions(-) diff --git a/pkg/yurthub/cachemanager/cache_manager.go b/pkg/yurthub/cachemanager/cache_manager.go index c4796be4421..df05c938f3f 100644 --- a/pkg/yurthub/cachemanager/cache_manager.go +++ b/pkg/yurthub/cachemanager/cache_manager.go @@ -25,6 +25,7 @@ import ( "os" "path" "strconv" + "strings" "sync" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" @@ -33,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" @@ -42,40 +44,6 @@ import ( "k8s.io/klog" ) -var ( - resourceToKindMap = map[string]string{ - "nodes": "Node", - "pods": "Pod", - "services": "Service", - "namespaces": "Namespace", - "endpoints": "Endpoints", - "configmaps": "ConfigMap", - "persistentvolumes": "PersistentVolume", - "persistentvolumeclaims": "PersistentVolumeClaim", - "events": "Event", - "secrets": "Secret", - "leases": "Lease", - "runtimeclasses": "RuntimeClass", - "csidrivers": "CSIDriver", - } - - resourceToListKindMap = map[string]string{ - "nodes": "NodeList", - "pods": "PodList", - "services": "ServiceList", - "namespaces": "NamespaceList", - "endpoints": "EndpointsList", - "configmaps": "ConfigMapList", - "persistentvolumes": "PersistentVolumeList", - "persistentvolumeclaims": "PersistentVolumeClaimList", - "events": "EventList", - "secrets": "SecretList", - "leases": "LeaseList", - "runtimeclasses": "RuntimeClassList", - "csidrivers": "CSIDriverList", - } -) - // CacheManager is an adaptor to cache runtime object data into backend storage type CacheManager interface { CacheResponse(ctx context.Context, prc io.ReadCloser, stopCh <-chan struct{}) error @@ -169,19 +137,6 @@ func (cm *cacheManager) queryListObject(req *http.Request) (runtime.Object, erro comp, _ := util.ClientComponentFrom(ctx) info, _ := apirequest.RequestInfoFrom(ctx) - listKind := resourceToListKindMap[info.Resource] - listGvk := schema.GroupVersionKind{ - Group: info.APIGroup, - Version: info.APIVersion, - Kind: listKind, - } - - listObj, err := scheme.Scheme.New(listGvk) - if err != nil { - klog.Errorf("failed to create list object(%v), %v", listGvk, err) - return nil, err - } - key, err := util.KeyFunc(comp, info.Resource, info.Namespace, info.Name) if err != nil { return nil, err @@ -190,6 +145,24 @@ func (cm *cacheManager) queryListObject(req *http.Request) (runtime.Object, erro objs, err := cm.storage.List(key) if err != nil { return nil, err + } else if len(objs) == 0 { + return nil, nil + } + var listObj runtime.Object + listGvk := schema.GroupVersionKind{ + Group: info.APIGroup, + Version: info.APIVersion, + Kind: objs[0].GetObjectKind().GroupVersionKind().Kind + "List", + } + if scheme.Scheme.Recognizes(listGvk) { + listObj, err = scheme.Scheme.New(listGvk) + if err != nil { + klog.Errorf("failed to create list object(%v), %v", listGvk, err) + return nil, err + } + } else { + listObj = new(unstructured.UnstructuredList) + listObj.GetObjectKind().SetGroupVersionKind(listGvk) } listRv := 0 @@ -248,20 +221,10 @@ func (cm *cacheManager) saveWatchObject(ctx context.Context, info *apirequest.Re comp, _ := util.ClientComponentFrom(ctx) reqContentType, _ := util.ReqContentTypeFrom(ctx) - serializers, err := cm.serializerManager.CreateSerializers(reqContentType, info.APIGroup, info.APIVersion) - if err != nil { - klog.Errorf("failed to create serializers in saveWatchObject, %v", err) - return err - } - kind := resourceToKindMap[info.Resource] - apiVersion := schema.GroupVersion{ - Group: info.APIGroup, - Version: info.APIVersion, - }.String() accessor := meta.NewAccessor() - d, err := serializer.WatchDecoder(serializers, r) + d, err := serializer.CreateWatchDecoder(reqContentType, info.APIGroup, info.APIVersion, info.Resource, r) if err != nil { klog.Errorf("saveWatchObject ended with error, %v", err) return err @@ -300,8 +263,6 @@ func (cm *cacheManager) saveWatchObject(ctx context.Context, info *apirequest.Re switch watchType { case watch.Added, watch.Modified: - accessor.SetAPIVersion(obj, apiVersion) - accessor.SetKind(obj, kind) err = cm.saveOneObjectWithValidation(key, obj) if watchType == watch.Added { addObjCnt++ @@ -336,7 +297,7 @@ func (cm *cacheManager) saveWatchObject(ctx context.Context, info *apirequest.Re func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.RequestInfo, b []byte) error { reqContentType, _ := util.ReqContentTypeFrom(ctx) respContentType, _ := util.RespContentTypeFrom(ctx) - serializers, err := cm.serializerManager.CreateSerializers(reqContentType, info.APIGroup, info.APIVersion) + serializers, err := cm.serializerManager.CreateSerializers(reqContentType, info.APIGroup, info.APIVersion, info.Resource) if err != nil { klog.Errorf("failed to create serializers in saveListObject, %v", err) return err @@ -362,7 +323,7 @@ func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.Req } klog.V(5).Infof("list items for %s is: %d", util.ReqInfoString(info), len(items)) - kind := resourceToKindMap[info.Resource] + kind := strings.TrimSuffix(list.GetObjectKind().GroupVersionKind().Kind, "List") apiVersion := schema.GroupVersion{ Group: info.APIGroup, Version: info.APIVersion, @@ -424,7 +385,7 @@ func (cm *cacheManager) saveOneObject(ctx context.Context, info *apirequest.Requ reqContentType, _ := util.ReqContentTypeFrom(ctx) respContentType, _ := util.RespContentTypeFrom(ctx) - serializers, err := cm.serializerManager.CreateSerializers(reqContentType, info.APIGroup, info.APIVersion) + serializers, err := cm.serializerManager.CreateSerializers(reqContentType, info.APIGroup, info.APIVersion, info.Resource) if err != nil { klog.Errorf("failed to create serializers in saveOneObject: %s, %v", util.ReqInfoString(info), err) return err @@ -444,15 +405,6 @@ func (cm *cacheManager) saveOneObject(ctx context.Context, info *apirequest.Requ // it's not need to cache for status return nil } - - kind := resourceToKindMap[info.Resource] - apiVersion := schema.GroupVersion{ - Group: info.APIGroup, - Version: info.APIVersion, - }.String() - - accessor.SetKind(obj, kind) - accessor.SetAPIVersion(obj, apiVersion) } var name string @@ -583,9 +535,5 @@ func (cm *cacheManager) CanCacheFor(req *http.Request) bool { return false } - if _, ok := resourceToKindMap[info.Resource]; !ok { - return false - } - return true } diff --git a/pkg/yurthub/cachemanager/cache_manager_test.go b/pkg/yurthub/cachemanager/cache_manager_test.go index 88d03368cb5..5b58821a2ba 100644 --- a/pkg/yurthub/cachemanager/cache_manager_test.go +++ b/pkg/yurthub/cachemanager/cache_manager_test.go @@ -35,6 +35,7 @@ import ( nodev1beta1 "k8s.io/api/node/v1beta1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" runtimeserializer "k8s.io/apimachinery/pkg/runtime/serializer" runtimejson "k8s.io/apimachinery/pkg/runtime/serializer/json" @@ -71,6 +72,7 @@ func TestCacheResponse(t *testing.T) { accept string verb string path string + resource string namespaced bool expectResult expectData }{ @@ -94,6 +96,7 @@ func TestCacheResponse(t *testing.T) { accept: "application/json", verb: "GET", path: "/api/v1/namespaces/default/pods/mypod1", + resource: "pods", namespaced: true, expectResult: expectData{ rv: "1", @@ -122,6 +125,7 @@ func TestCacheResponse(t *testing.T) { accept: "application/json", verb: "GET", path: "/api/v1/namespaces/default/pods/mypod2", + resource: "pods", namespaced: true, expectResult: expectData{ rv: "3", @@ -149,6 +153,7 @@ func TestCacheResponse(t *testing.T) { accept: "application/json", verb: "GET", path: "/api/v1/nodes/mynode1", + resource: "nodes", namespaced: false, expectResult: expectData{ rv: "4", @@ -175,6 +180,7 @@ func TestCacheResponse(t *testing.T) { accept: "application/json", verb: "GET", path: "/api/v1/nodes/mynode2", + resource: "nodes", namespaced: false, expectResult: expectData{ rv: "6", @@ -182,13 +188,127 @@ func TestCacheResponse(t *testing.T) { kind: "Node", }, }, + + //used to test whether custom resources can be cached correctly + { + desc: "cache response for get crontab", + group: "stable.example.com", + version: "v1", + key: "kubelet/crontabs/default/crontab1", + inputObj: runtime.Object(&unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "stable.example.com/v1", + "kind": "CronTab", + "metadata": map[string]interface{}{ + "name": "crontab1", + "namespace": "default", + "resourceVersion": "1", + }, + }, + }), + userAgent: "kubelet", + accept: "application/json", + verb: "GET", + path: "/apis/stable.example.com/v1/namespaces/default/crontabs/crontab1", + resource: "crontabs", + namespaced: true, + expectResult: expectData{ + rv: "1", + name: "crontab1", + ns: "default", + kind: "CronTab", + }, + }, + { + desc: "cache response for get crontab2", + group: "stable.example.com", + version: "v1", + key: "kubelet/crontabs/default/crontab2", + inputObj: runtime.Object(&unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "stable.example.com/v1", + "kind": "CronTab", + "metadata": map[string]interface{}{ + "name": "crontab2", + "namespace": "default", + "resourceVersion": "3", + }, + }, + }), + userAgent: "kubelet", + accept: "application/json", + verb: "GET", + path: "/apis/stable.example.com/v1/namespaces/default/crontabs/crontab2", + resource: "crontabs", + namespaced: true, + expectResult: expectData{ + rv: "3", + name: "crontab2", + ns: "default", + kind: "CronTab", + }, + }, + { + desc: "cache response for get foo without namespace", + group: "samplecontroller.k8s.io", + version: "v1", + key: "kubelet/foos/foo1", + inputObj: runtime.Object(&unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "samplecontroller.k8s.io/v1", + "kind": "Foo", + "metadata": map[string]interface{}{ + "name": "foo1", + "resourceVersion": "3", + }, + }, + }), + userAgent: "kubelet", + accept: "application/json", + verb: "GET", + path: "/apis/samplecontroller.k8s.io/v1/foos/foo1", + resource: "foos", + namespaced: false, + expectResult: expectData{ + rv: "3", + name: "foo1", + kind: "Foo", + }, + }, + { + desc: "cache response for get foo2 without namespace", + group: "samplecontroller.k8s.io", + version: "v1", + key: "kubelet/foos/foo2", + inputObj: runtime.Object(&unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "samplecontroller.k8s.io/v1", + "kind": "Foo", + "metadata": map[string]interface{}{ + "name": "foo2", + "resourceVersion": "5", + }, + }, + }), + userAgent: "kubelet", + accept: "application/json", + verb: "GET", + path: "/apis/samplecontroller.k8s.io/v1/foos/foo2", + resource: "foos", + namespaced: false, + expectResult: expectData{ + rv: "5", + name: "foo2", + kind: "Foo", + }, + }, } accessor := meta.NewAccessor() resolver := newTestRequestInfoResolver() for _, tt := range tests { t.Run(tt.desc, func(t *testing.T) { - encoder, err := serializerM.CreateSerializers(tt.accept, tt.group, tt.version) + encoder, err := serializerM.CreateSerializers(tt.accept, tt.group, tt.version, tt.resource) if err != nil { t.Fatalf("could not create serializer, %v", err) } @@ -284,6 +404,21 @@ func TestCacheResponseForWatch(t *testing.T) { } } + //used to generate the custom resources + mkCronTab := func(id string, rv string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "stable.example.com/v1", + "kind": "CronTab", + "metadata": map[string]interface{}{ + "name": id, + "namespace": "default", + "resourceVersion": rv, + }, + }, + } + } + storage := NewFakeStorageWrapper() serializerM := serializer.NewSerializerManager() yurtCM := &cacheManager{ @@ -396,6 +531,95 @@ func TestCacheResponseForWatch(t *testing.T) { }, }, }, + + //used to test whether custom resource's watch-events can be cached correctly + { + desc: "cache response for watch add crontabs", + group: "stable.example.com", + version: "v1", + key: "kubelet/crontabs/default", + inputObj: []watch.Event{ + {Type: watch.Added, Object: mkCronTab("crontab1", "2")}, + {Type: watch.Added, Object: mkCronTab("crontab2", "4")}, + {Type: watch.Added, Object: mkCronTab("crontab3", "6")}, + }, + userAgent: "kubelet", + accept: "application/json", + verb: "GET", + path: "/apis/stable.example.com/v1/namespaces/default/crontabs?watch=true", + namespaced: true, + expectResult: expectData{ + data: map[string]struct{}{ + "crontab-default-crontab1-2": {}, + "crontab-default-crontab2-4": {}, + "crontab-default-crontab3-6": {}, + }, + }, + }, + { + desc: "cache response for watch add and delete crontabs", + group: "stable.example.com", + version: "v1", + key: "kubelet/crontabs/default", + inputObj: []watch.Event{ + {Type: watch.Added, Object: mkCronTab("crontab1", "2")}, + {Type: watch.Deleted, Object: mkCronTab("crontab1", "4")}, + {Type: watch.Added, Object: mkCronTab("crontab3", "6")}, + }, + userAgent: "kubelet", + accept: "application/json", + verb: "GET", + path: "/apis/stable.example.com/v1/namespaces/default/crontabs?watch=true", + namespaced: true, + expectResult: expectData{ + data: map[string]struct{}{ + "crontab-default-crontab3-6": {}, + }, + }, + }, + { + desc: "cache response for watch add and update crontabs", + group: "stable.example.com", + version: "v1", + key: "kubelet/crontabs/default", + inputObj: []watch.Event{ + {Type: watch.Added, Object: mkCronTab("crontab1", "2")}, + {Type: watch.Modified, Object: mkCronTab("crontab1", "4")}, + {Type: watch.Added, Object: mkCronTab("crontab3", "6")}, + }, + userAgent: "kubelet", + accept: "application/json", + verb: "GET", + path: "/apis/stable.example.com/v1/namespaces/default/crontabs?watch=true", + namespaced: true, + expectResult: expectData{ + data: map[string]struct{}{ + "crontab-default-crontab1-4": {}, + "crontab-default-crontab3-6": {}, + }, + }, + }, + { + desc: "cache response for watch not update crontabs", + group: "stable.example.com", + version: "v1", + key: "kubelet/crontabs/default", + inputObj: []watch.Event{ + {Type: watch.Added, Object: mkCronTab("crontab1", "6")}, + {Type: watch.Modified, Object: mkCronTab("crontab1", "4")}, + {Type: watch.Modified, Object: mkCronTab("crontab1", "2")}, + }, + userAgent: "kubelet", + accept: "application/json", + verb: "GET", + path: "/apis/stable.example.com/v1/namespaces/default/crontabs?watch=true", + namespaced: true, + expectResult: expectData{ + data: map[string]struct{}{ + "crontab-default-crontab1-6": {}, + }, + }, + }, } accessor := meta.NewAccessor() @@ -404,6 +628,7 @@ func TestCacheResponseForWatch(t *testing.T) { t.Run(tt.desc, func(t *testing.T) { r, w := io.Pipe() go func(w *io.PipeWriter) { + //For unregistered GVKs, the normal encoding is used by default and the original GVK information is set encoder := restclientwatch.NewEncoder(streaming.NewEncoder(w, getEncoder()), getEncoder()) for i := range tt.inputObj { @@ -451,7 +676,6 @@ func TestCacheResponseForWatch(t *testing.T) { objs, err := storage.List(tt.key) if err != nil || len(objs) == 0 { t.Errorf("failed to get object from storage") - } if len(objs) != len(tt.expectResult.data) { @@ -504,6 +728,7 @@ func TestCacheResponseForList(t *testing.T) { accept string verb string path string + resource string namespaced bool expectResult expectData }{ @@ -562,6 +787,7 @@ func TestCacheResponseForList(t *testing.T) { accept: "application/json", verb: "GET", path: "/api/v1/namespaces/default/pods", + resource: "pods", namespaced: true, expectResult: expectData{ data: map[string]struct{}{ @@ -633,6 +859,7 @@ func TestCacheResponseForList(t *testing.T) { accept: "application/json", verb: "GET", path: "/api/v1/nodes", + resource: "nodes", namespaced: false, expectResult: expectData{ data: map[string]struct{}{ @@ -708,13 +935,123 @@ func TestCacheResponseForList(t *testing.T) { data: map[string]struct{}{}, }, }, + + //used to test whether custom resource list can be cached correctly + { + desc: "cache response for list crontabs", + group: "stable.example.com", + version: "v1", + key: "kubelet/crontabs/default", + inputObj: runtime.Object( + &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "apiVersion": "stable.example.com/v1", + "kind": "CronTabList", + "metadata": map[string]interface{}{ + "continue": "", + "resourceVersion": "2", + "selfLink": "/apis/stable.example.com/v1/namespaces/default/crontabs", + }, + }, + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "apiVersion": "stable.example.com/v1", + "kind": "CronTab", + "metadata": map[string]interface{}{ + "name": "crontab1", + "namespace": "default", + "resourceVersion": "1", + }, + }, + }, + { + Object: map[string]interface{}{ + "apiVersion": "stable.example.com/v1", + "kind": "CronTab", + "metadata": map[string]interface{}{ + "name": "crontab2", + "namespace": "default", + "resourceVersion": "2", + }, + }, + }, + }, + }, + ), + userAgent: "kubelet", + accept: "application/json", + verb: "GET", + path: "/apis/stable.example.com/v1/namespaces/default/crontabs", + resource: "crontabs", + namespaced: true, + expectResult: expectData{ + data: map[string]struct{}{ + "crontab-default-crontab1-1": {}, + "crontab-default-crontab2-2": {}, + }, + }, + }, + { + desc: "cache response for list foos without namespace", + group: "samplecontroller.k8s.io", + version: "v1", + key: "kubelet/foos", + inputObj: runtime.Object( + &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "apiVersion": "samplecontroller.k8s.io/v1", + "kind": "FooList", + "metadata": map[string]interface{}{ + "continue": "", + "resourceVersion": "2", + "selfLink": "/apis/samplecontroller.k8s.io/v1/foos", + }, + }, + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "apiVersion": "samplecontroller.k8s.io/v1", + "kind": "Foo", + "metadata": map[string]interface{}{ + "name": "foo1", + "resourceVersion": "1", + }, + }, + }, + { + Object: map[string]interface{}{ + "apiVersion": "samplecontroller.k8s.io/v1", + "kind": "Foo", + "metadata": map[string]interface{}{ + "name": "foo2", + "resourceVersion": "2", + }, + }, + }, + }, + }, + ), + userAgent: "kubelet", + accept: "application/json", + verb: "GET", + path: "/apis/samplecontroller.k8s.io/v1/foos", + resource: "foos", + namespaced: false, + expectResult: expectData{ + data: map[string]struct{}{ + "foo-foo1-1": {}, + "foo-foo2-2": {}, + }, + }, + }, } accessor := meta.NewAccessor() resolver := newTestRequestInfoResolver() for _, tt := range tests { t.Run(tt.desc, func(t *testing.T) { - encoder, err := serializerM.CreateSerializers(tt.accept, tt.group, tt.version) + encoder, err := serializerM.CreateSerializers(tt.accept, tt.group, tt.version, tt.resource) if err != nil { t.Fatalf("could not create serializer, %v", err) } @@ -953,6 +1290,213 @@ func TestQueryCacheForGet(t *testing.T) { kind: "Node", }, }, + + //used to test whether the query local Custom Resource request can be handled correctly + { + desc: "no client", + accept: "application/json", + verb: "GET", + path: "/apis/stable.example.com/v1/namespaces/default/crontabs/crontab1", + namespaced: true, + expectResult: expectData{ + err: true, + }, + }, + { + desc: "query post crontab", + key: "kubelet/crontabs/default/crontab1", + inputObj: runtime.Object(&unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "stable.example.com/v1", + "kind": "CronTab", + "metadata": map[string]interface{}{ + "name": "crontab1", + "namespace": "default", + "resourceVersion": "1", + }, + }, + }), + userAgent: "kubelet", + accept: "application/json", + verb: "POST", + path: "/apis/stable.example.com/v1/namespaces/default/crontabs/crontab1", + namespaced: true, + expectResult: expectData{ + err: true, + }, + }, + { + desc: "query get crontab", + key: "kubelet/crontabs/default/crontab1", + inputObj: runtime.Object(&unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "stable.example.com/v1", + "kind": "CronTab", + "metadata": map[string]interface{}{ + "name": "crontab1", + "namespace": "default", + "resourceVersion": "1", + }, + }, + }), + userAgent: "kubelet", + accept: "application/json", + verb: "GET", + path: "/apis/stable.example.com/v1/namespaces/default/crontabs/crontab1", + namespaced: true, + expectResult: expectData{ + rv: "1", + name: "crontab1", + ns: "default", + kind: "CronTab", + }, + }, + { + desc: "query update crontab", + key: "kubelet/crontabs/default/crontab2", + inputObj: runtime.Object(&unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "stable.example.com/v1", + "kind": "CronTab", + "metadata": map[string]interface{}{ + "name": "crontab2", + "namespace": "default", + "resourceVersion": "2", + }, + }, + }), + userAgent: "kubelet", + accept: "application/json", + verb: "PUT", + path: "/apis/stable.example.com/v1/namespaces/default/crontabs/crontab2", + namespaced: true, + expectResult: expectData{ + rv: "2", + name: "crontab2", + ns: "default", + kind: "CronTab", + }, + }, + { + desc: "query patch crontab", + key: "kubelet/crontabs/default/crontab3", + inputObj: runtime.Object(&unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "stable.example.com/v1", + "kind": "CronTab", + "metadata": map[string]interface{}{ + "name": "crontab3", + "namespace": "default", + "resourceVersion": "4", + }, + }, + }), + userAgent: "kubelet", + accept: "application/json", + verb: "PATCH", + path: "/apis/stable.example.com/v1/namespaces/default/crontabs/crontab3/status", + namespaced: true, + expectResult: expectData{ + rv: "4", + name: "crontab3", + ns: "default", + kind: "CronTab", + }, + }, + { + desc: "query post foo", + key: "kubelet/foos/foo1", + inputObj: runtime.Object(&unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "samplecontroller.k8s.io/v1", + "kind": "Foo", + "metadata": map[string]interface{}{ + "name": "foo1", + "resourceVersion": "1", + }, + }, + }), + userAgent: "kubelet", + accept: "application/json", + verb: "POST", + path: "/apis/samplecontroller.k8s.io/v1/foos/foo1", + namespaced: false, + expectResult: expectData{ + err: true, + }, + }, + { + desc: "query get foo", + key: "kubelet/foos/foo1", + inputObj: runtime.Object(&unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "samplecontroller.k8s.io/v1", + "kind": "Foo", + "metadata": map[string]interface{}{ + "name": "foo1", + "resourceVersion": "1", + }, + }, + }), + userAgent: "kubelet", + accept: "application/json", + verb: "GET", + path: "/apis/samplecontroller.k8s.io/v1/foos/foo1", + namespaced: false, + expectResult: expectData{ + rv: "1", + name: "foo1", + kind: "Foo", + }, + }, + { + desc: "query update foo", + key: "kubelet/foos/foo2", + inputObj: runtime.Object(&unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "samplecontroller.k8s.io/v1", + "kind": "Foo", + "metadata": map[string]interface{}{ + "name": "foo2", + "resourceVersion": "2", + }, + }, + }), + userAgent: "kubelet", + accept: "application/json", + verb: "PUT", + path: "/apis/samplecontroller.k8s.io/v1/foos/foo2", + namespaced: false, + expectResult: expectData{ + rv: "2", + name: "foo2", + kind: "Foo", + }, + }, + { + desc: "query patch foo", + key: "kubelet/foos/foo3", + inputObj: runtime.Object(&unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "samplecontroller.k8s.io/v1", + "kind": "Foo", + "metadata": map[string]interface{}{ + "name": "foo3", + "resourceVersion": "4", + }, + }, + }), + userAgent: "kubelet", + accept: "application/json", + verb: "PATCH", + path: "/apis/samplecontroller.k8s.io/v1/foos/foo3/status", + namespaced: false, + expectResult: expectData{ + rv: "4", + name: "foo3", + kind: "Foo", + }, + }, } accessor := meta.NewAccessor() @@ -1159,6 +1703,109 @@ func TestQueryCacheForList(t *testing.T) { }, }, }, + + //used to test whether the query local Custom Resource list request can be handled correctly + { + desc: "query list crontabs", + keyPrefix: "kubelet/crontabs/default", + inputObj: []runtime.Object{ + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "stable.example.com/v1", + "kind": "CronTab", + "metadata": map[string]interface{}{ + "name": "crontab1", + "namespace": "default", + "resourceVersion": "1", + }, + }, + }, + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "stable.example.com/v1", + "kind": "CronTab", + "metadata": map[string]interface{}{ + "name": "crontab2", + "namespace": "default", + "resourceVersion": "2", + }, + }, + }, + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "stable.example.com/v1", + "kind": "CronTab", + "metadata": map[string]interface{}{ + "name": "crontab3", + "namespace": "default", + "resourceVersion": "5", + }, + }, + }, + }, + userAgent: "kubelet", + accept: "application/json", + verb: "GET", + path: "/apis/stable.example.com/v1/namespaces/default/crontabs", + namespaced: true, + expectResult: expectData{ + rv: "5", + data: map[string]struct{}{ + "crontab-default-crontab1-1": {}, + "crontab-default-crontab2-2": {}, + "crontab-default-crontab3-5": {}, + }, + }, + }, + { + desc: "query list foos", + keyPrefix: "kubelet/foos", + inputObj: []runtime.Object{ + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "samplecontroller.k8s.io/v1", + "kind": "Foo", + "metadata": map[string]interface{}{ + "name": "foo1", + "resourceVersion": "1", + }, + }, + }, + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "samplecontroller.k8s.io/v1", + "kind": "Foo", + "metadata": map[string]interface{}{ + "name": "foo2", + "resourceVersion": "2", + }, + }, + }, + &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "samplecontroller.k8s.io/v1", + "kind": "Foo", + "metadata": map[string]interface{}{ + "name": "foo3", + "resourceVersion": "5", + }, + }, + }, + }, + userAgent: "kubelet", + accept: "application/json", + verb: "GET", + path: "/apis/samplecontroller.k8s.io/v1/foos", + namespaced: false, + expectResult: expectData{ + rv: "5", + data: map[string]struct{}{ + "foo-foo1-1": {}, + "foo-foo2-2": {}, + "foo-foo3-5": {}, + }, + }, + }, } accessor := meta.NewAccessor() diff --git a/pkg/yurthub/cachemanager/storage_wrapper.go b/pkg/yurthub/cachemanager/storage_wrapper.go index 15cf6d5864d..fe75833bb12 100644 --- a/pkg/yurthub/cachemanager/storage_wrapper.go +++ b/pkg/yurthub/cachemanager/storage_wrapper.go @@ -20,9 +20,10 @@ import ( "bytes" "sync" + "github.com/openyurtio/openyurt/pkg/yurthub/storage" "github.com/openyurtio/openyurt/pkg/yurthub/util" - "github.com/openyurtio/openyurt/pkg/yurthub/storage" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer/json" "k8s.io/client-go/kubernetes/scheme" @@ -53,7 +54,7 @@ type storageWrapper struct { func NewStorageWrapper(storage storage.Store) StorageWrapper { return &storageWrapper{ store: storage, - backendSerializer: json.NewSerializer(json.DefaultMetaFactory, scheme.Scheme, scheme.Scheme, false), + backendSerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, scheme.Scheme, scheme.Scheme, json.SerializerOptions{}), cache: make(map[string]runtime.Object), } } @@ -118,8 +119,18 @@ func (sw *storageWrapper) Get(key string) (runtime.Object, error) { } else if len(b) == 0 { return nil, nil } - - obj, gvk, err := sw.backendSerializer.Decode(b, nil, nil) + //get the gvk from json data + gvk, err := json.DefaultMetaFactory.Interpret(b) + if err != nil { + return nil, err + } + var UnstructuredObj runtime.Object + if scheme.Scheme.Recognizes(*gvk) { + UnstructuredObj = nil + } else { + UnstructuredObj = new(unstructured.Unstructured) + } + obj, gvk, err := sw.backendSerializer.Decode(b, nil, UnstructuredObj) if err != nil { klog.Errorf("could not decode %v for %s, %v", gvk, key, err) return nil, err @@ -156,9 +167,19 @@ func (sw *storageWrapper) List(key string) ([]runtime.Object, error) { } return objects, nil } - + //get the gvk from json data + gvk, err := json.DefaultMetaFactory.Interpret(bb[0]) + if err != nil { + return nil, err + } + var UnstructuredObj runtime.Object + if scheme.Scheme.Recognizes(*gvk) { + UnstructuredObj = nil + } else { + UnstructuredObj = new(unstructured.Unstructured) + } for i := range bb { - obj, gvk, err := sw.backendSerializer.Decode(bb[i], nil, nil) + obj, gvk, err := sw.backendSerializer.Decode(bb[i], nil, UnstructuredObj) if err != nil { klog.Errorf("could not decode %v for %s, %v", gvk, key, err) continue diff --git a/pkg/yurthub/kubernetes/serializer/serializer.go b/pkg/yurthub/kubernetes/serializer/serializer.go index c901d4a1840..3d6dd39cb34 100644 --- a/pkg/yurthub/kubernetes/serializer/serializer.go +++ b/pkg/yurthub/kubernetes/serializer/serializer.go @@ -20,11 +20,16 @@ import ( "fmt" "io" "mime" + "strings" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/unstructuredscheme" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/runtime/serializer/json" "k8s.io/apimachinery/pkg/runtime/serializer/streaming" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -35,30 +40,174 @@ import ( // YurtHubSerializer is a global serializer manager for yurthub var YurtHubSerializer = NewSerializerManager() +// UnsafeDefaultRESTMapper is only used to check whether the GVK is in the scheme according to the GVR information +var UnsafeDefaultRESTMapper = NewDefaultRESTMapperFromScheme() + +func NewDefaultRESTMapperFromScheme() *meta.DefaultRESTMapper { + scheme := scheme.Scheme + defaultGroupVersions := scheme.PrioritizedVersionsAllGroups() + mapper := meta.NewDefaultRESTMapper(defaultGroupVersions) + // enumerate all supported versions, get the kinds, and register with the mapper how to address + // our resources. + for _, gv := range defaultGroupVersions { + for kind := range scheme.KnownTypes(gv) { + //Since RESTMapper is only used for mapping GVR to GVK information, + //the scope field is not involved in actual use, so all scope are currently set to meta.RESTScopeNamespace + scope := meta.RESTScopeNamespace + mapper.Add(gv.WithKind(kind), scope) + } + } + return mapper +} + // SerializerManager is responsible for managing *rest.Serializers type SerializerManager struct { // NegotiatedSerializer is used for obtaining encoders and decoders for multiple // supported media types. NegotiatedSerializer runtime.NegotiatedSerializer + // UnstructuredNegotiatedSerializer is used to obtain encoders and decoders + // for resources not registered in the scheme + UnstructuredNegotiatedSerializer runtime.NegotiatedSerializer } // NewSerializerManager creates a *SerializerManager object with no version conversion func NewSerializerManager() *SerializerManager { return &SerializerManager{ - NegotiatedSerializer: serializer.WithoutConversionCodecFactory{CodecFactory: scheme.Codecs}, // do not need version conversion + // do not need version conversion, and keep the gvk information + NegotiatedSerializer: WithVersionCodecFactory{CodecFactory: scheme.Codecs}, + UnstructuredNegotiatedSerializer: NewUnstructuredNegotiatedSerializer(), + } +} + +// WithVersionCodecFactory is a CodecFactory that will explicitly ignore requests to perform conversion. +// It keeps the gvk during deserialization. +// This wrapper is used while code migrates away from using conversion (such as external clients) +type WithVersionCodecFactory struct { + serializer.CodecFactory +} + +// EncoderForVersion returns an encoder that does not do conversion, but does set the group version kind of the object +// when serialized. +func (f WithVersionCodecFactory) EncoderForVersion(serializer runtime.Encoder, version runtime.GroupVersioner) runtime.Encoder { + return runtime.WithVersionEncoder{ + Version: version, + Encoder: serializer, + ObjectTyper: scheme.Scheme, + } +} + +// DecoderToVersion returns an decoder that does not do conversion, and keeps the gvk information +func (f WithVersionCodecFactory) DecoderToVersion(serializer runtime.Decoder, _ runtime.GroupVersioner) runtime.Decoder { + return WithVersionDecoder{ + Decoder: serializer, + } +} + +// WithVersionDecoder keeps the group version kind of a deserialized object. +type WithVersionDecoder struct { + runtime.Decoder +} + +// Decode does not do conversion. It keeps the gvk during deserialization. +func (d WithVersionDecoder) Decode(data []byte, defaults *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) { + return d.Decoder.Decode(data, defaults, into) +} + +type UnstructuredNegotiatedSerializer struct { + scheme *runtime.Scheme + typer runtime.ObjectTyper + creator runtime.ObjectCreater +} + +// NewUnstructuredNegotiatedSerializer returns a negotiated serializer for Unstructured resources +func NewUnstructuredNegotiatedSerializer() runtime.NegotiatedSerializer { + return UnstructuredNegotiatedSerializer{ + scheme: scheme.Scheme, + typer: unstructuredscheme.NewUnstructuredObjectTyper(), + creator: NewUnstructuredCreator(), + } +} + +func (s UnstructuredNegotiatedSerializer) SupportedMediaTypes() []runtime.SerializerInfo { + return []runtime.SerializerInfo{ + { + MediaType: "application/json", + MediaTypeType: "application", + MediaTypeSubType: "json", + EncodesAsText: true, + Serializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, s.creator, s.typer, json.SerializerOptions{}), + PrettySerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, s.creator, s.typer, json.SerializerOptions{Pretty: true}), + StreamSerializer: &runtime.StreamSerializerInfo{ + EncodesAsText: true, + Serializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, s.creator, s.typer, json.SerializerOptions{}), + Framer: json.Framer, + }, + }, + { + MediaType: "application/yaml", + MediaTypeType: "application", + MediaTypeSubType: "yaml", + EncodesAsText: true, + Serializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, s.creator, s.typer, json.SerializerOptions{Yaml: true}), + }, + } +} + +//EncoderForVersion do nothing, but returns a encoder, +//if the object is unstructured, the encoder will encode object without conversion +func (s UnstructuredNegotiatedSerializer) EncoderForVersion(encoder runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder { + return encoder +} + +//DecoderToVersion do nothing, and returns a decoder that does not do conversion +func (s UnstructuredNegotiatedSerializer) DecoderToVersion(decoder runtime.Decoder, _ runtime.GroupVersioner) runtime.Decoder { + return WithVersionDecoder{ + Decoder: decoder, + } +} + +type unstructuredCreator struct{} + +// NewUnstructuredCreator returns a simple object creator that always returns an Unstructured or UnstructuredList +func NewUnstructuredCreator() runtime.ObjectCreater { + return unstructuredCreator{} +} + +func (c unstructuredCreator) New(kind schema.GroupVersionKind) (runtime.Object, error) { + if strings.HasSuffix(kind.Kind, "List") { + ret := &unstructured.UnstructuredList{} + ret.SetGroupVersionKind(kind) + return ret, nil + } else { + ret := &unstructured.Unstructured{} + ret.SetGroupVersionKind(kind) + return ret, nil } } // CreateSerializers create a *rest.Serializers for encoding or decoding runtime object -func (sm *SerializerManager) CreateSerializers(contentType, group, version string) (*rest.Serializers, error) { - mediaTypes := sm.NegotiatedSerializer.SupportedMediaTypes() +func (sm *SerializerManager) CreateSerializers(contentType, group, version, resource string) (*rest.Serializers, error) { + var mediaTypes []runtime.SerializerInfo + gvr := schema.GroupVersionResource{ + Group: group, + Version: version, + Resource: resource, + } + _, kindErr := UnsafeDefaultRESTMapper.KindFor(gvr) + if kindErr == nil || resource == "WatchEvent" { + mediaTypes = sm.NegotiatedSerializer.SupportedMediaTypes() + } else { + mediaTypes = sm.UnstructuredNegotiatedSerializer.SupportedMediaTypes() + } mediaType, _, err := mime.ParseMediaType(contentType) if err != nil { return nil, fmt.Errorf("the content type(%s) specified in the request is not recognized: %v", contentType, err) } - info, ok := runtime.SerializerInfoForMediaType(mediaTypes, mediaType) if !ok { + if mediaType == "application/vnd.kubernetes.protobuf" && kindErr != nil { + return nil, fmt.Errorf("*unstructured.Unstructured(%s/%s) does not implement the protobuf marshalling interface and cannot be encoded to a protobuf message", group, version) + } if len(contentType) != 0 || len(mediaTypes) == 0 { return nil, fmt.Errorf("no serializers registered for %s", contentType) } @@ -80,17 +229,30 @@ func (sm *SerializerManager) CreateSerializers(contentType, group, version strin Group: group, Version: version, } + var encoder runtime.Encoder + var decoder runtime.Decoder + if kindErr == nil { + encoder = sm.NegotiatedSerializer.EncoderForVersion(info.Serializer, &reqGroupVersion) + decoder = sm.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV) + } else { + encoder = sm.UnstructuredNegotiatedSerializer.EncoderForVersion(info.Serializer, &reqGroupVersion) + decoder = sm.UnstructuredNegotiatedSerializer.DecoderToVersion(info.Serializer, &reqGroupVersion) + } s := &rest.Serializers{ - Encoder: sm.NegotiatedSerializer.EncoderForVersion(info.Serializer, &reqGroupVersion), - Decoder: sm.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV), + Encoder: encoder, + Decoder: decoder, RenegotiatedDecoder: func(contentType string, params map[string]string) (runtime.Decoder, error) { info, ok := runtime.SerializerInfoForMediaType(mediaTypes, contentType) if !ok { return nil, fmt.Errorf("serializer for %s not registered", contentType) } - return sm.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV), nil + if kindErr == nil { + return sm.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV), nil + } else { + return sm.UnstructuredNegotiatedSerializer.DecoderToVersion(info.Serializer, &reqGroupVersion), nil + } }, } if info.StreamSerializer != nil { @@ -129,14 +291,28 @@ func DecodeResp(serializers *rest.Serializers, b []byte, reqContentType, respCon switch out.(type) { case *metav1.Status: // it's not need to cache for status - return nil, nil + return out, nil } return out, nil } -// WatchDecoder generates a Decoder for watch response -func WatchDecoder(serializers *rest.Serializers, body io.ReadCloser) (*restclientwatch.Decoder, error) { +// CreateWatchDecoder generates a Decoder for watch response +func CreateWatchDecoder(contentType, group, version, resource string, body io.ReadCloser) (*restclientwatch.Decoder, error) { + //get the general serializers to decode the watch event + serializers, err := YurtHubSerializer.CreateSerializers(contentType, group, version, "WatchEvent") + if err != nil { + klog.Errorf("failed to create serializers in saveWatchObject, %v", err) + return nil, err + } + + //get the serializers to decode the embedded object inside watch event according to the GVR of embedded object + embeddedSerializers, err := YurtHubSerializer.CreateSerializers(contentType, group, version, resource) + if err != nil { + klog.Errorf("failed to create serializers in saveWatchObject, %v", err) + return nil, err + } + framer := serializers.Framer.NewFrameReader(body) streamingDecoder := streaming.NewDecoder(framer, serializers.StreamingSerializer) - return restclientwatch.NewDecoder(streamingDecoder, serializers.Decoder), nil + return restclientwatch.NewDecoder(streamingDecoder, embeddedSerializers.Decoder), nil } diff --git a/pkg/yurthub/proxy/local/local_test.go b/pkg/yurthub/proxy/local/local_test.go index d35c35c821b..84bc8b0c68e 100644 --- a/pkg/yurthub/proxy/local/local_test.go +++ b/pkg/yurthub/proxy/local/local_test.go @@ -371,6 +371,7 @@ func TestServeHTTPForGetReqCache(t *testing.T) { accept string verb string path string + resource string code int data expectData }{ @@ -391,10 +392,11 @@ func TestServeHTTPForGetReqCache(t *testing.T) { }, }, }, - accept: "application/json", - verb: "GET", - path: "/api/v1/namespaces/default/pods/mypod1", - code: http.StatusOK, + accept: "application/json", + verb: "GET", + path: "/api/v1/namespaces/default/pods/mypod1", + resource: "pods", + code: http.StatusOK, data: expectData{ ns: "default", name: "mypod1", @@ -407,7 +409,7 @@ func TestServeHTTPForGetReqCache(t *testing.T) { for _, tt := range testcases { t.Run(tt.desc, func(t *testing.T) { - jsonDecoder, _ := serializerM.CreateSerializers(tt.accept, "", "v1") + jsonDecoder, _ := serializerM.CreateSerializers(tt.accept, "", "v1", tt.resource) accessor := meta.NewAccessor() for i := range tt.inputObj { name, _ := accessor.Name(tt.inputObj[i]) @@ -495,6 +497,7 @@ func TestServeHTTPForListReqCache(t *testing.T) { accept string verb string path string + resource string code int expectD expectData }{ @@ -537,10 +540,11 @@ func TestServeHTTPForListReqCache(t *testing.T) { }, }, }, - accept: "application/json", - verb: "GET", - path: "/api/v1/namespaces/default/pods", - code: http.StatusOK, + accept: "application/json", + verb: "GET", + path: "/api/v1/namespaces/default/pods", + resource: "pods", + code: http.StatusOK, expectD: expectData{ rv: "6", data: map[string]struct{}{ @@ -556,7 +560,7 @@ func TestServeHTTPForListReqCache(t *testing.T) { for _, tt := range testcases { t.Run(tt.desc, func(t *testing.T) { - jsonDecoder, _ := serializerM.CreateSerializers(tt.accept, "", "v1") + jsonDecoder, _ := serializerM.CreateSerializers(tt.accept, "", "v1", tt.resource) accessor := meta.NewAccessor() for i := range tt.inputObj { name, _ := accessor.Name(tt.inputObj[i])