From 6171e979369a52a733449360be73a6e257e03d22 Mon Sep 17 00:00:00 2001 From: chentao <421224811@qq.com> Date: Tue, 16 Mar 2021 18:15:40 +0800 Subject: [PATCH] Enhance Yurthub caching ability --- pkg/yurthub/cachemanager/cache_manager.go | 101 +++------- .../cachemanager/cache_manager_test.go | 12 +- pkg/yurthub/cachemanager/storage_wrapper.go | 33 +++- .../kubernetes/serializer/serializer.go | 182 +++++++++++++++++- pkg/yurthub/proxy/local/local_test.go | 24 ++- 5 files changed, 253 insertions(+), 99 deletions(-) diff --git a/pkg/yurthub/cachemanager/cache_manager.go b/pkg/yurthub/cachemanager/cache_manager.go index b1052d93679..98345bb8aee 100644 --- a/pkg/yurthub/cachemanager/cache_manager.go +++ b/pkg/yurthub/cachemanager/cache_manager.go @@ -25,6 +25,7 @@ import ( "os" "path" "strconv" + "strings" "sync" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" @@ -33,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" @@ -42,40 +44,6 @@ import ( "k8s.io/klog" ) -var ( - resourceToKindMap = map[string]string{ - "nodes": "Node", - "pods": "Pod", - "services": "Service", - "namespaces": "Namespace", - "endpoints": "Endpoints", - "configmaps": "ConfigMap", - "persistentvolumes": "PersistentVolume", - "persistentvolumeclaims": "PersistentVolumeClaim", - "events": "Event", - "secrets": "Secret", - "leases": "Lease", - "runtimeclasses": "RuntimeClass", - "csidrivers": "CSIDriver", - } - - resourceToListKindMap = map[string]string{ - "nodes": "NodeList", - "pods": "PodList", - "services": "ServiceList", - "namespaces": "NamespaceList", - "endpoints": "EndpointsList", - "configmaps": "ConfigMapList", - "persistentvolumes": "PersistentVolumeList", - "persistentvolumeclaims": "PersistentVolumeClaimList", - "events": "EventList", - "secrets": "SecretList", - "leases": "LeaseList", - "runtimeclasses": "RuntimeClassList", - "csidrivers": "CSIDriverList", - } -) - // CacheManager is an adaptor to cache runtime object data into backend storage type CacheManager interface { CacheResponse(ctx context.Context, prc io.ReadCloser, stopCh <-chan struct{}) error @@ -169,26 +137,31 @@ func (cm *cacheManager) queryListObject(req *http.Request) (runtime.Object, erro comp, _ := util.ClientComponentFrom(ctx) info, _ := apirequest.RequestInfoFrom(ctx) - listKind := resourceToListKindMap[info.Resource] - listGvk := schema.GroupVersionKind{ - Group: info.APIGroup, - Version: info.APIVersion, - Kind: listKind, - } - - listObj, err := scheme.Scheme.New(listGvk) + key, err := util.KeyFunc(comp, info.Resource, info.Namespace, info.Name) if err != nil { - klog.Errorf("failed to create list object(%v), %v", listGvk, err) return nil, err } - key, err := util.KeyFunc(comp, info.Resource, info.Namespace, info.Name) + objs, err := cm.storage.List(key) if err != nil { return nil, err + } else if len(objs) == 0 { + return nil, nil + } + var listObj runtime.Object + listGvk := schema.GroupVersionKind{ + Group: info.APIGroup, + Version: info.APIVersion, + Kind: objs[0].GetObjectKind().GroupVersionKind().Kind + "List", + } + if scheme.Scheme.Recognizes(listGvk) { + listObj, err = scheme.Scheme.New(listGvk) + } else { + listObj = new(unstructured.UnstructuredList) + listObj.GetObjectKind().SetGroupVersionKind(listGvk) } - - objs, err := cm.storage.List(key) if err != nil { + klog.Errorf("failed to create list object(%v), %v", listGvk, err) return nil, err } @@ -248,20 +221,21 @@ func (cm *cacheManager) saveWatchObject(ctx context.Context, info *apirequest.Re comp, _ := util.ClientComponentFrom(ctx) reqContentType, _ := util.ReqContentTypeFrom(ctx) - serializers, err := cm.serializerManager.CreateSerializers(reqContentType, info.APIGroup, info.APIVersion) + serializers, err := cm.serializerManager.CreateSerializers(reqContentType, "", "v1", "events") + if err != nil { + klog.Errorf("failed to create serializers in saveWatchObject, %v", err) + return err + } + + embeddedSerializers, err := cm.serializerManager.CreateSerializers(reqContentType, info.APIGroup, info.APIVersion, info.Resource) if err != nil { klog.Errorf("failed to create serializers in saveWatchObject, %v", err) return err } - kind := resourceToKindMap[info.Resource] - apiVersion := schema.GroupVersion{ - Group: info.APIGroup, - Version: info.APIVersion, - }.String() accessor := meta.NewAccessor() - d, err := serializer.WatchDecoder(serializers, r) + d, err := serializer.WatchDecoder(serializers, embeddedSerializers, r) if err != nil { klog.Errorf("saveWatchObject ended with error, %v", err) return err @@ -300,8 +274,6 @@ func (cm *cacheManager) saveWatchObject(ctx context.Context, info *apirequest.Re switch watchType { case watch.Added, watch.Modified: - accessor.SetAPIVersion(obj, apiVersion) - accessor.SetKind(obj, kind) err = cm.saveOneObjectWithValidation(key, obj) if watchType == watch.Added { addObjCnt++ @@ -336,7 +308,7 @@ func (cm *cacheManager) saveWatchObject(ctx context.Context, info *apirequest.Re func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.RequestInfo, b []byte) error { reqContentType, _ := util.ReqContentTypeFrom(ctx) respContentType, _ := util.RespContentTypeFrom(ctx) - serializers, err := cm.serializerManager.CreateSerializers(reqContentType, info.APIGroup, info.APIVersion) + serializers, err := cm.serializerManager.CreateSerializers(reqContentType, info.APIGroup, info.APIVersion, info.Resource) if err != nil { klog.Errorf("failed to create serializers in saveListObject, %v", err) return err @@ -362,7 +334,7 @@ func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.Req } klog.V(5).Infof("list items for %s is: %d", util.ReqInfoString(info), len(items)) - kind := resourceToKindMap[info.Resource] + kind := strings.TrimSuffix(list.GetObjectKind().GroupVersionKind().Kind, "List") apiVersion := schema.GroupVersion{ Group: info.APIGroup, Version: info.APIVersion, @@ -415,7 +387,7 @@ func (cm *cacheManager) saveOneObject(ctx context.Context, info *apirequest.Requ reqContentType, _ := util.ReqContentTypeFrom(ctx) respContentType, _ := util.RespContentTypeFrom(ctx) - serializers, err := cm.serializerManager.CreateSerializers(reqContentType, info.APIGroup, info.APIVersion) + serializers, err := cm.serializerManager.CreateSerializers(reqContentType, info.APIGroup, info.APIVersion, info.Resource) if err != nil { klog.Errorf("failed to create serializers in saveOneObject: %s, %v", util.ReqInfoString(info), err) return err @@ -435,15 +407,6 @@ func (cm *cacheManager) saveOneObject(ctx context.Context, info *apirequest.Requ // it's not need to cache for status return nil } - - kind := resourceToKindMap[info.Resource] - apiVersion := schema.GroupVersion{ - Group: info.APIGroup, - Version: info.APIVersion, - }.String() - - accessor.SetKind(obj, kind) - accessor.SetAPIVersion(obj, apiVersion) } var name string @@ -574,9 +537,5 @@ func (cm *cacheManager) CanCacheFor(req *http.Request) bool { return false } - if _, ok := resourceToKindMap[info.Resource]; !ok { - return false - } - return true } diff --git a/pkg/yurthub/cachemanager/cache_manager_test.go b/pkg/yurthub/cachemanager/cache_manager_test.go index 20990be806f..b2115d3f7f4 100644 --- a/pkg/yurthub/cachemanager/cache_manager_test.go +++ b/pkg/yurthub/cachemanager/cache_manager_test.go @@ -70,6 +70,7 @@ func TestCacheResponse(t *testing.T) { accept string verb string path string + resource string namespaced bool expectResult expectData }{ @@ -93,6 +94,7 @@ func TestCacheResponse(t *testing.T) { accept: "application/json", verb: "GET", path: "/api/v1/namespaces/default/pods/mypod1", + resource: "pods", namespaced: true, expectResult: expectData{ rv: "1", @@ -121,6 +123,7 @@ func TestCacheResponse(t *testing.T) { accept: "application/json", verb: "GET", path: "/api/v1/namespaces/default/pods/mypod2", + resource: "pods", namespaced: true, expectResult: expectData{ rv: "3", @@ -148,6 +151,7 @@ func TestCacheResponse(t *testing.T) { accept: "application/json", verb: "GET", path: "/api/v1/nodes/mynode1", + resource: "nodes", namespaced: false, expectResult: expectData{ rv: "4", @@ -174,6 +178,7 @@ func TestCacheResponse(t *testing.T) { accept: "application/json", verb: "GET", path: "/api/v1/nodes/mynode2", + resource: "nodes", namespaced: false, expectResult: expectData{ rv: "6", @@ -187,7 +192,7 @@ func TestCacheResponse(t *testing.T) { resolver := newTestRequestInfoResolver() for _, tt := range tests { t.Run(tt.desc, func(t *testing.T) { - encoder, err := serializerM.CreateSerializers(tt.accept, tt.group, tt.version) + encoder, err := serializerM.CreateSerializers(tt.accept, tt.group, tt.version, tt.resource) if err != nil { t.Fatalf("could not create serializer, %v", err) } @@ -503,6 +508,7 @@ func TestCacheResponseForList(t *testing.T) { accept string verb string path string + resource string namespaced bool expectResult expectData }{ @@ -561,6 +567,7 @@ func TestCacheResponseForList(t *testing.T) { accept: "application/json", verb: "GET", path: "/api/v1/namespaces/default/pods", + resource: "pods", namespaced: true, expectResult: expectData{ data: map[string]struct{}{ @@ -632,6 +639,7 @@ func TestCacheResponseForList(t *testing.T) { accept: "application/json", verb: "GET", path: "/api/v1/nodes", + resource: "nodes", namespaced: false, expectResult: expectData{ data: map[string]struct{}{ @@ -648,7 +656,7 @@ func TestCacheResponseForList(t *testing.T) { resolver := newTestRequestInfoResolver() for _, tt := range tests { t.Run(tt.desc, func(t *testing.T) { - encoder, err := serializerM.CreateSerializers(tt.accept, tt.group, tt.version) + encoder, err := serializerM.CreateSerializers(tt.accept, tt.group, tt.version, tt.resource) if err != nil { t.Fatalf("could not create serializer, %v", err) } diff --git a/pkg/yurthub/cachemanager/storage_wrapper.go b/pkg/yurthub/cachemanager/storage_wrapper.go index fdeb9c63eed..20bede82f48 100644 --- a/pkg/yurthub/cachemanager/storage_wrapper.go +++ b/pkg/yurthub/cachemanager/storage_wrapper.go @@ -20,9 +20,10 @@ import ( "bytes" "sync" + "github.com/openyurtio/openyurt/pkg/yurthub/storage" "github.com/openyurtio/openyurt/pkg/yurthub/util" - "github.com/openyurtio/openyurt/pkg/yurthub/storage" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer/json" "k8s.io/client-go/kubernetes/scheme" @@ -53,7 +54,7 @@ type storageWrapper struct { func NewStorageWrapper(storage storage.Store) StorageWrapper { return &storageWrapper{ store: storage, - backendSerializer: json.NewSerializer(json.DefaultMetaFactory, scheme.Scheme, scheme.Scheme, false), + backendSerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, scheme.Scheme, scheme.Scheme, json.SerializerOptions{}), cache: make(map[string]runtime.Object), } } @@ -113,8 +114,18 @@ func (sw *storageWrapper) Get(key string) (runtime.Object, error) { } else if len(b) == 0 { return nil, nil } - - obj, gvk, err := sw.backendSerializer.Decode(b, nil, nil) + //get the gvk from json data + gvk, err := json.DefaultMetaFactory.Interpret(b) + if err != nil { + return nil, err + } + var UnstructuredObj runtime.Object + if scheme.Scheme.Recognizes(*gvk) { + UnstructuredObj = nil + } else { + UnstructuredObj = new(unstructured.Unstructured) + } + obj, gvk, err := sw.backendSerializer.Decode(b, nil, UnstructuredObj) if err != nil { klog.Errorf("could not decode %v for %s, %v", gvk, key, err) return nil, err @@ -143,9 +154,19 @@ func (sw *storageWrapper) List(key string) ([]runtime.Object, error) { } else if len(bb) == 0 { return objects, nil } - + //get the gvk from json data + gvk, err := json.DefaultMetaFactory.Interpret(bb[0]) + if err != nil { + return nil, err + } + var UnstructuredObj runtime.Object + if scheme.Scheme.Recognizes(*gvk) { + UnstructuredObj = nil + } else { + UnstructuredObj = new(unstructured.Unstructured) + } for i := range bb { - obj, gvk, err := sw.backendSerializer.Decode(bb[i], nil, nil) + obj, gvk, err := sw.backendSerializer.Decode(bb[i], nil, UnstructuredObj) if err != nil { klog.Errorf("could not decode %v for %s, %v", gvk, key, err) continue diff --git a/pkg/yurthub/kubernetes/serializer/serializer.go b/pkg/yurthub/kubernetes/serializer/serializer.go index c901d4a1840..da766f9428c 100644 --- a/pkg/yurthub/kubernetes/serializer/serializer.go +++ b/pkg/yurthub/kubernetes/serializer/serializer.go @@ -20,12 +20,18 @@ import ( "fmt" "io" "mime" + "strings" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/unstructuredscheme" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/runtime/serializer/json" "k8s.io/apimachinery/pkg/runtime/serializer/streaming" + "k8s.io/apimachinery/pkg/runtime/serializer/versioning" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" restclientwatch "k8s.io/client-go/rest/watch" @@ -35,30 +41,173 @@ import ( // YurtHubSerializer is a global serializer manager for yurthub var YurtHubSerializer = NewSerializerManager() +// UnsafeDefaultRESTMapper is used to check whether the GVK is in the scheme according to the GVR information +var UnsafeDefaultRESTMapper = NewDefaultRESTMapperFromScheme() + +func NewDefaultRESTMapperFromScheme() *meta.DefaultRESTMapper { + scheme := scheme.Scheme + defaultGroupVersions := scheme.PrioritizedVersionsAllGroups() + mapper := meta.NewDefaultRESTMapper(defaultGroupVersions) + // enumerate all supported versions, get the kinds, and register with the mapper how to address + // our resources. + for _, gv := range defaultGroupVersions { + for kind := range scheme.KnownTypes(gv) { + scope := meta.RESTScopeNamespace + mapper.Add(gv.WithKind(kind), scope) + } + } + return mapper +} + // SerializerManager is responsible for managing *rest.Serializers type SerializerManager struct { // NegotiatedSerializer is used for obtaining encoders and decoders for multiple // supported media types. NegotiatedSerializer runtime.NegotiatedSerializer + // UnstructuredNegotiatedSerializer is used to obtain encoders and decoders + // for resources not registered in the scheme + UnstructuredNegotiatedSerializer runtime.NegotiatedSerializer } // NewSerializerManager creates a *SerializerManager object with no version conversion func NewSerializerManager() *SerializerManager { return &SerializerManager{ - NegotiatedSerializer: serializer.WithoutConversionCodecFactory{CodecFactory: scheme.Codecs}, // do not need version conversion + // do not need version conversion, and keep the gvk information + NegotiatedSerializer: WithVersionCodecFactory{CodecFactory: scheme.Codecs}, + UnstructuredNegotiatedSerializer: NewUnstructuredNegotiatedSerializer(), + } +} + +// WithVersionCodecFactory is a CodecFactory that will explicitly ignore requests to perform conversion. +// It keeps the gvk during deserialization. +// This wrapper is used while code migrates away from using conversion (such as external clients) +type WithVersionCodecFactory struct { + serializer.CodecFactory +} + +// EncoderForVersion returns an encoder that does not do conversion, but does set the group version kind of the object +// when serialized. +func (f WithVersionCodecFactory) EncoderForVersion(serializer runtime.Encoder, version runtime.GroupVersioner) runtime.Encoder { + return runtime.WithVersionEncoder{ + Version: version, + Encoder: serializer, + ObjectTyper: scheme.Scheme, + } +} + +// DecoderToVersion returns an decoder that does not do conversion, and keeps the gvk information +func (f WithVersionCodecFactory) DecoderToVersion(serializer runtime.Decoder, _ runtime.GroupVersioner) runtime.Decoder { + return WithVersionDecoder{ + Decoder: serializer, + } +} + +// WithVersionDecoder keeps the group version kind of a deserialized object. +type WithVersionDecoder struct { + runtime.Decoder +} + +// Decode does not do conversion. It keeps the gvk during deserialization. +func (d WithVersionDecoder) Decode(data []byte, defaults *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) { + return d.Decoder.Decode(data, defaults, into) +} + +type UnstructuredNegotiatedSerializer struct { + scheme *runtime.Scheme + typer runtime.ObjectTyper + creator runtime.ObjectCreater +} + +// NewUnstructuredNegotiatedSerializer returns a negotiated serializer for Unstructured resources +func NewUnstructuredNegotiatedSerializer() runtime.NegotiatedSerializer { + return UnstructuredNegotiatedSerializer{ + scheme: scheme.Scheme, + typer: unstructuredscheme.NewUnstructuredObjectTyper(), + creator: NewUnstructuredCreator(), + } +} + +func (s UnstructuredNegotiatedSerializer) SupportedMediaTypes() []runtime.SerializerInfo { + return []runtime.SerializerInfo{ + { + MediaType: "application/json", + MediaTypeType: "application", + MediaTypeSubType: "json", + EncodesAsText: true, + Serializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, s.creator, s.typer, json.SerializerOptions{}), + PrettySerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, s.creator, s.typer, json.SerializerOptions{Pretty: true}), + StreamSerializer: &runtime.StreamSerializerInfo{ + EncodesAsText: true, + Serializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, s.creator, s.typer, json.SerializerOptions{}), + Framer: json.Framer, + }, + }, + { + MediaType: "application/yaml", + MediaTypeType: "application", + MediaTypeSubType: "yaml", + EncodesAsText: true, + Serializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, s.creator, s.typer, json.SerializerOptions{Yaml: true}), + }, + } +} + +//EncoderForVersion do nothing, but returns a encoder, +//if the object is unstructured, the encoder will encode object without conversion +func (s UnstructuredNegotiatedSerializer) EncoderForVersion(encoder runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder { + return versioning.NewDefaultingCodecForScheme(s.scheme, encoder, nil, gv, nil) + +} + +//DecoderToVersion do nothing, and returns a decoder that does not do conversion +func (s UnstructuredNegotiatedSerializer) DecoderToVersion(decoder runtime.Decoder, _ runtime.GroupVersioner) runtime.Decoder { + return WithVersionDecoder{ + Decoder: decoder, + } +} + +type unstructuredCreator struct{} + +// NewUnstructuredCreator returns a simple object creator that always returns an Unstructured or UnstructuredList +func NewUnstructuredCreator() runtime.ObjectCreater { + return unstructuredCreator{} +} + +func (c unstructuredCreator) New(kind schema.GroupVersionKind) (runtime.Object, error) { + if strings.HasSuffix(kind.Kind, "List") { + ret := &unstructured.UnstructuredList{} + ret.SetGroupVersionKind(kind) + return ret, nil + } else { + ret := &unstructured.Unstructured{} + ret.SetGroupVersionKind(kind) + return ret, nil } } // CreateSerializers create a *rest.Serializers for encoding or decoding runtime object -func (sm *SerializerManager) CreateSerializers(contentType, group, version string) (*rest.Serializers, error) { - mediaTypes := sm.NegotiatedSerializer.SupportedMediaTypes() +func (sm *SerializerManager) CreateSerializers(contentType, group, version, resource string) (*rest.Serializers, error) { + var mediaTypes []runtime.SerializerInfo + gvr := schema.GroupVersionResource{ + Group: group, + Version: version, + Resource: resource, + } + _, kindErr := UnsafeDefaultRESTMapper.KindFor(gvr) + if kindErr == nil { + mediaTypes = sm.NegotiatedSerializer.SupportedMediaTypes() + } else { + mediaTypes = sm.UnstructuredNegotiatedSerializer.SupportedMediaTypes() + } mediaType, _, err := mime.ParseMediaType(contentType) if err != nil { return nil, fmt.Errorf("the content type(%s) specified in the request is not recognized: %v", contentType, err) } - info, ok := runtime.SerializerInfoForMediaType(mediaTypes, mediaType) if !ok { + if mediaType == "application/vnd.kubernetes.protobuf" && kindErr != nil { + return nil, fmt.Errorf("*unstructured.Unstructured(%s/%s) does not implement the protobuf marshalling interface and cannot be encoded to a protobuf message", group, version) + } if len(contentType) != 0 || len(mediaTypes) == 0 { return nil, fmt.Errorf("no serializers registered for %s", contentType) } @@ -80,17 +229,30 @@ func (sm *SerializerManager) CreateSerializers(contentType, group, version strin Group: group, Version: version, } + var encoder runtime.Encoder + var decoder runtime.Decoder + if kindErr == nil { + encoder = sm.NegotiatedSerializer.EncoderForVersion(info.Serializer, &reqGroupVersion) + decoder = sm.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV) + } else { + encoder = sm.UnstructuredNegotiatedSerializer.EncoderForVersion(info.Serializer, &reqGroupVersion) + decoder = sm.UnstructuredNegotiatedSerializer.DecoderToVersion(info.Serializer, &reqGroupVersion) + } s := &rest.Serializers{ - Encoder: sm.NegotiatedSerializer.EncoderForVersion(info.Serializer, &reqGroupVersion), - Decoder: sm.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV), + Encoder: encoder, + Decoder: decoder, RenegotiatedDecoder: func(contentType string, params map[string]string) (runtime.Decoder, error) { info, ok := runtime.SerializerInfoForMediaType(mediaTypes, contentType) if !ok { return nil, fmt.Errorf("serializer for %s not registered", contentType) } - return sm.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV), nil + if kindErr == nil { + return sm.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV), nil + } else { + return sm.UnstructuredNegotiatedSerializer.DecoderToVersion(info.Serializer, &reqGroupVersion), nil + } }, } if info.StreamSerializer != nil { @@ -129,14 +291,14 @@ func DecodeResp(serializers *rest.Serializers, b []byte, reqContentType, respCon switch out.(type) { case *metav1.Status: // it's not need to cache for status - return nil, nil + return out, nil } return out, nil } // WatchDecoder generates a Decoder for watch response -func WatchDecoder(serializers *rest.Serializers, body io.ReadCloser) (*restclientwatch.Decoder, error) { +func WatchDecoder(serializers *rest.Serializers, embeddedSerializers *rest.Serializers, body io.ReadCloser) (*restclientwatch.Decoder, error) { framer := serializers.Framer.NewFrameReader(body) streamingDecoder := streaming.NewDecoder(framer, serializers.StreamingSerializer) - return restclientwatch.NewDecoder(streamingDecoder, serializers.Decoder), nil + return restclientwatch.NewDecoder(streamingDecoder, embeddedSerializers.Decoder), nil } diff --git a/pkg/yurthub/proxy/local/local_test.go b/pkg/yurthub/proxy/local/local_test.go index d35c35c821b..84bc8b0c68e 100644 --- a/pkg/yurthub/proxy/local/local_test.go +++ b/pkg/yurthub/proxy/local/local_test.go @@ -371,6 +371,7 @@ func TestServeHTTPForGetReqCache(t *testing.T) { accept string verb string path string + resource string code int data expectData }{ @@ -391,10 +392,11 @@ func TestServeHTTPForGetReqCache(t *testing.T) { }, }, }, - accept: "application/json", - verb: "GET", - path: "/api/v1/namespaces/default/pods/mypod1", - code: http.StatusOK, + accept: "application/json", + verb: "GET", + path: "/api/v1/namespaces/default/pods/mypod1", + resource: "pods", + code: http.StatusOK, data: expectData{ ns: "default", name: "mypod1", @@ -407,7 +409,7 @@ func TestServeHTTPForGetReqCache(t *testing.T) { for _, tt := range testcases { t.Run(tt.desc, func(t *testing.T) { - jsonDecoder, _ := serializerM.CreateSerializers(tt.accept, "", "v1") + jsonDecoder, _ := serializerM.CreateSerializers(tt.accept, "", "v1", tt.resource) accessor := meta.NewAccessor() for i := range tt.inputObj { name, _ := accessor.Name(tt.inputObj[i]) @@ -495,6 +497,7 @@ func TestServeHTTPForListReqCache(t *testing.T) { accept string verb string path string + resource string code int expectD expectData }{ @@ -537,10 +540,11 @@ func TestServeHTTPForListReqCache(t *testing.T) { }, }, }, - accept: "application/json", - verb: "GET", - path: "/api/v1/namespaces/default/pods", - code: http.StatusOK, + accept: "application/json", + verb: "GET", + path: "/api/v1/namespaces/default/pods", + resource: "pods", + code: http.StatusOK, expectD: expectData{ rv: "6", data: map[string]struct{}{ @@ -556,7 +560,7 @@ func TestServeHTTPForListReqCache(t *testing.T) { for _, tt := range testcases { t.Run(tt.desc, func(t *testing.T) { - jsonDecoder, _ := serializerM.CreateSerializers(tt.accept, "", "v1") + jsonDecoder, _ := serializerM.CreateSerializers(tt.accept, "", "v1", tt.resource) accessor := meta.NewAccessor() for i := range tt.inputObj { name, _ := accessor.Name(tt.inputObj[i])