From 8e464566f0bd0403c3e1c95fe7ce17d64a5aaba6 Mon Sep 17 00:00:00 2001 From: chentao <421224811@qq.com> Date: Mon, 15 Mar 2021 11:43:18 +0800 Subject: [PATCH] Support CR resource --- pkg/yurthub/cachemanager/cache_manager.go | 34 ++++- .../cachemanager/cache_manager_test.go | 12 +- pkg/yurthub/cachemanager/storage_wrapper.go | 28 +++- .../kubernetes/serializer/serializer.go | 139 +++++++++++++++--- pkg/yurthub/proxy/local/local_test.go | 24 +-- 5 files changed, 195 insertions(+), 42 deletions(-) diff --git a/pkg/yurthub/cachemanager/cache_manager.go b/pkg/yurthub/cachemanager/cache_manager.go index 5618c2d0296..76cb460403e 100644 --- a/pkg/yurthub/cachemanager/cache_manager.go +++ b/pkg/yurthub/cachemanager/cache_manager.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" @@ -145,15 +146,26 @@ func (cm *cacheManager) queryListObject(req *http.Request) (runtime.Object, erro if err != nil { return nil, err } - - listKind := objs[0].GetObjectKind().GroupVersionKind().Kind + "List" + var listKind string + if len(objs) == 0 { + klog.Errorf("no %s in cache", info.Resource) + return nil, err + } else { + listKind = objs[0].GetObjectKind().GroupVersionKind().Kind + "List" + } + var listObj runtime.Object listGvk := schema.GroupVersionKind{ Group: info.APIGroup, Version: info.APIVersion, Kind: listKind, } - - listObj, err := scheme.Scheme.New(listGvk) + if scheme.Scheme.Recognizes(listGvk) { + listObj, err = scheme.Scheme.New(listGvk) + } else { + tmp := new(unstructured.UnstructuredList) + tmp.SetGroupVersionKind(listGvk) + listObj = tmp + } if err != nil { klog.Errorf("failed to create list object(%v), %v", listGvk, err) return nil, err @@ -215,7 +227,13 @@ func (cm *cacheManager) saveWatchObject(ctx context.Context, info *apirequest.Re comp, _ := util.ClientComponentFrom(ctx) reqContentType, _ := util.ReqContentTypeFrom(ctx) - serializers, err := cm.serializerManager.CreateSerializers(reqContentType, info.APIGroup, info.APIVersion) + serializers, err := cm.serializerManager.CreateSerializers(reqContentType, info.APIGroup, info.APIVersion, "api") + if err != nil { + klog.Errorf("failed to create serializers in saveWatchObject, %v", err) + return err + } + + embeddedSerializers, err := cm.serializerManager.CreateSerializers(reqContentType, info.APIGroup, info.APIVersion, info.APIPrefix) if err != nil { klog.Errorf("failed to create serializers in saveWatchObject, %v", err) return err @@ -227,7 +245,7 @@ func (cm *cacheManager) saveWatchObject(ctx context.Context, info *apirequest.Re }.String() accessor := meta.NewAccessor() - d, err := serializer.WatchDecoder(serializers, r) + d, err := serializer.WatchDecoder(serializers, embeddedSerializers, r) if err != nil { klog.Errorf("saveWatchObject ended with error, %v", err) return err @@ -301,7 +319,7 @@ func (cm *cacheManager) saveWatchObject(ctx context.Context, info *apirequest.Re func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.RequestInfo, b []byte) error { reqContentType, _ := util.ReqContentTypeFrom(ctx) respContentType, _ := util.RespContentTypeFrom(ctx) - serializers, err := cm.serializerManager.CreateSerializers(reqContentType, info.APIGroup, info.APIVersion) + serializers, err := cm.serializerManager.CreateSerializers(reqContentType, info.APIGroup, info.APIVersion, info.APIPrefix) if err != nil { klog.Errorf("failed to create serializers in saveListObject, %v", err) return err @@ -380,7 +398,7 @@ func (cm *cacheManager) saveOneObject(ctx context.Context, info *apirequest.Requ reqContentType, _ := util.ReqContentTypeFrom(ctx) respContentType, _ := util.RespContentTypeFrom(ctx) - serializers, err := cm.serializerManager.CreateSerializers(reqContentType, info.APIGroup, info.APIVersion) + serializers, err := cm.serializerManager.CreateSerializers(reqContentType, info.APIGroup, info.APIVersion, info.APIPrefix) if err != nil { klog.Errorf("failed to create serializers in saveOneObject: %s, %v", util.ReqInfoString(info), err) return err diff --git a/pkg/yurthub/cachemanager/cache_manager_test.go b/pkg/yurthub/cachemanager/cache_manager_test.go index 20990be806f..b4fbf77291b 100644 --- a/pkg/yurthub/cachemanager/cache_manager_test.go +++ b/pkg/yurthub/cachemanager/cache_manager_test.go @@ -70,6 +70,7 @@ func TestCacheResponse(t *testing.T) { accept string verb string path string + apiPrefix string namespaced bool expectResult expectData }{ @@ -93,6 +94,7 @@ func TestCacheResponse(t *testing.T) { accept: "application/json", verb: "GET", path: "/api/v1/namespaces/default/pods/mypod1", + apiPrefix: "api", namespaced: true, expectResult: expectData{ rv: "1", @@ -121,6 +123,7 @@ func TestCacheResponse(t *testing.T) { accept: "application/json", verb: "GET", path: "/api/v1/namespaces/default/pods/mypod2", + apiPrefix: "api", namespaced: true, expectResult: expectData{ rv: "3", @@ -148,6 +151,7 @@ func TestCacheResponse(t *testing.T) { accept: "application/json", verb: "GET", path: "/api/v1/nodes/mynode1", + apiPrefix: "api", namespaced: false, expectResult: expectData{ rv: "4", @@ -174,6 +178,7 @@ func TestCacheResponse(t *testing.T) { accept: "application/json", verb: "GET", path: "/api/v1/nodes/mynode2", + apiPrefix: "api", namespaced: false, expectResult: expectData{ rv: "6", @@ -187,7 +192,7 @@ func TestCacheResponse(t *testing.T) { resolver := newTestRequestInfoResolver() for _, tt := range tests { t.Run(tt.desc, func(t *testing.T) { - encoder, err := serializerM.CreateSerializers(tt.accept, tt.group, tt.version) + encoder, err := serializerM.CreateSerializers(tt.accept, tt.group, tt.version, tt.apiPrefix) if err != nil { t.Fatalf("could not create serializer, %v", err) } @@ -503,6 +508,7 @@ func TestCacheResponseForList(t *testing.T) { accept string verb string path string + apiPrefix string namespaced bool expectResult expectData }{ @@ -561,6 +567,7 @@ func TestCacheResponseForList(t *testing.T) { accept: "application/json", verb: "GET", path: "/api/v1/namespaces/default/pods", + apiPrefix: "api", namespaced: true, expectResult: expectData{ data: map[string]struct{}{ @@ -632,6 +639,7 @@ func TestCacheResponseForList(t *testing.T) { accept: "application/json", verb: "GET", path: "/api/v1/nodes", + apiPrefix: "api", namespaced: false, expectResult: expectData{ data: map[string]struct{}{ @@ -648,7 +656,7 @@ func TestCacheResponseForList(t *testing.T) { resolver := newTestRequestInfoResolver() for _, tt := range tests { t.Run(tt.desc, func(t *testing.T) { - encoder, err := serializerM.CreateSerializers(tt.accept, tt.group, tt.version) + encoder, err := serializerM.CreateSerializers(tt.accept, tt.group, tt.version, tt.apiPrefix) if err != nil { t.Fatalf("could not create serializer, %v", err) } diff --git a/pkg/yurthub/cachemanager/storage_wrapper.go b/pkg/yurthub/cachemanager/storage_wrapper.go index e2c4c4645fe..95811c9d0f3 100644 --- a/pkg/yurthub/cachemanager/storage_wrapper.go +++ b/pkg/yurthub/cachemanager/storage_wrapper.go @@ -23,6 +23,7 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/storage" "github.com/openyurtio/openyurt/pkg/yurthub/util" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer/json" "k8s.io/client-go/kubernetes/scheme" @@ -113,8 +114,18 @@ func (sw *storageWrapper) Get(key string) (runtime.Object, error) { } else if len(b) == 0 { return nil, nil } - - obj, gvk, err := sw.backendSerializer.Decode(b, nil, nil) + //get the gvk from json data + gvk, err := json.DefaultMetaFactory.Interpret(b) + if err != nil { + return nil, err + } + var UnstructuredObj runtime.Object + if scheme.Scheme.Recognizes(*gvk) { + UnstructuredObj = nil + } else { + UnstructuredObj = new(unstructured.Unstructured) + } + obj, gvk, err := sw.backendSerializer.Decode(b, nil, UnstructuredObj) if err != nil { klog.Errorf("could not decode %v for %s, %v", gvk, key, err) return nil, err @@ -145,7 +156,18 @@ func (sw *storageWrapper) List(key string) ([]runtime.Object, error) { } for i := range bb { - obj, gvk, err := sw.backendSerializer.Decode(bb[i], nil, nil) + //get the gvk from json data + gvk, err := json.DefaultMetaFactory.Interpret(bb[i]) + if err != nil { + return nil, err + } + var UnstructuredObj runtime.Object + if scheme.Scheme.Recognizes(*gvk) { + UnstructuredObj = nil + } else { + UnstructuredObj = new(unstructured.Unstructured) + } + obj, gvk, err := sw.backendSerializer.Decode(bb[i], nil, UnstructuredObj) if err != nil { klog.Errorf("could not decode %v for %s, %v", gvk, key, err) continue diff --git a/pkg/yurthub/kubernetes/serializer/serializer.go b/pkg/yurthub/kubernetes/serializer/serializer.go index 6c5b8201c7e..08280ce4607 100644 --- a/pkg/yurthub/kubernetes/serializer/serializer.go +++ b/pkg/yurthub/kubernetes/serializer/serializer.go @@ -20,12 +20,17 @@ import ( "fmt" "io" "mime" + "strings" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/unstructuredscheme" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/runtime/serializer/json" "k8s.io/apimachinery/pkg/runtime/serializer/streaming" + "k8s.io/apimachinery/pkg/runtime/serializer/versioning" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" restclientwatch "k8s.io/client-go/rest/watch" @@ -35,6 +40,25 @@ import ( // YurtHubSerializer is a global serializer manager for yurthub var YurtHubSerializer = NewSerializerManager() +// SerializerManager is responsible for managing *rest.Serializers +type SerializerManager struct { + // NegotiatedSerializer is used for obtaining encoders and decoders for multiple + // supported media types. + NegotiatedSerializer runtime.NegotiatedSerializer + // UnstructuredNegotiatedSerializer is used to obtain encoders and decoders + // for resources not registered in the scheme + UnstructuredNegotiatedSerializer runtime.NegotiatedSerializer +} + +// NewSerializerManager creates a *SerializerManager object with no version conversion +func NewSerializerManager() *SerializerManager { + return &SerializerManager{ + // do not need version conversion, and keep the gvk information + NegotiatedSerializer: WithVersionCodecFactory{CodecFactory: scheme.Codecs}, + UnstructuredNegotiatedSerializer: NewUnstructuredNegotiatedSerializer(), + } +} + // WithVersionCodecFactory is a CodecFactory that will explicitly ignore requests to perform conversion. // It keeps the gvk during deserialization. // This wrapper is used while code migrates away from using conversion (such as external clients) @@ -54,7 +78,6 @@ func (f WithVersionCodecFactory) EncoderForVersion(serializer runtime.Encoder, v // DecoderToVersion returns an decoder that does not do conversion, and keeps the gvk information func (f WithVersionCodecFactory) DecoderToVersion(serializer runtime.Decoder, _ runtime.GroupVersioner) runtime.Decoder { - //return versioning.NewDefaultingCodecForScheme(s.scheme, nil, decoder, nil, gv) return WithVersionDecoder{ Decoder: serializer, } @@ -71,31 +94,96 @@ func (d WithVersionDecoder) Decode(data []byte, defaults *schema.GroupVersionKin return obj, gvk, err } -// SerializerManager is responsible for managing *rest.Serializers -type SerializerManager struct { - // NegotiatedSerializer is used for obtaining encoders and decoders for multiple - // supported media types. - NegotiatedSerializer runtime.NegotiatedSerializer +type UnstructuredNegotiatedSerializer struct { + scheme *runtime.Scheme + typer runtime.ObjectTyper + creator runtime.ObjectCreater } -// NewSerializerManager creates a *SerializerManager object with no version conversion -func NewSerializerManager() *SerializerManager { - return &SerializerManager{ - // do not need version conversion, and keep the gvk information - NegotiatedSerializer: WithVersionCodecFactory{CodecFactory: scheme.Codecs}, +// NewUnstructuredNegotiatedSerializer returns a negotiated serializer for Unstructured resources +func NewUnstructuredNegotiatedSerializer() runtime.NegotiatedSerializer { + return UnstructuredNegotiatedSerializer{ + scheme: scheme.Scheme, + typer: unstructuredscheme.NewUnstructuredObjectTyper(), + creator: NewUnstructuredCreator(), + } +} + +func (s UnstructuredNegotiatedSerializer) SupportedMediaTypes() []runtime.SerializerInfo { + return []runtime.SerializerInfo{ + { + MediaType: "application/json", + MediaTypeType: "application", + MediaTypeSubType: "json", + EncodesAsText: true, + Serializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, s.creator, s.typer, json.SerializerOptions{}), + PrettySerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, s.creator, s.typer, json.SerializerOptions{Pretty: true}), + StreamSerializer: &runtime.StreamSerializerInfo{ + EncodesAsText: true, + Serializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, s.creator, s.typer, json.SerializerOptions{}), + Framer: json.Framer, + }, + }, + { + MediaType: "application/yaml", + MediaTypeType: "application", + MediaTypeSubType: "yaml", + EncodesAsText: true, + Serializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, s.creator, s.typer, json.SerializerOptions{Yaml: true}), + }, + } +} + +//EncoderForVersion do nothing, but returns a encoder, +//if the object is unstructured, the encoder will encode object without conversion +func (s UnstructuredNegotiatedSerializer) EncoderForVersion(encoder runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder { + return versioning.NewDefaultingCodecForScheme(s.scheme, encoder, nil, gv, nil) + +} + +//DecoderToVersion do nothing, and returns a decoder that does not do conversion +func (s UnstructuredNegotiatedSerializer) DecoderToVersion(decoder runtime.Decoder, _ runtime.GroupVersioner) runtime.Decoder { + return WithVersionDecoder{ + Decoder: decoder, + } +} + +type unstructuredCreator struct{} + +// NewUnstructuredCreator returns a simple object creator that always returns an Unstructured or UnstructuredList +func NewUnstructuredCreator() runtime.ObjectCreater { + return unstructuredCreator{} +} + +func (c unstructuredCreator) New(kind schema.GroupVersionKind) (runtime.Object, error) { + if strings.HasSuffix(kind.Kind, "List") { + ret := &unstructured.UnstructuredList{} + ret.SetGroupVersionKind(kind) + return ret, nil + } else { + ret := &unstructured.Unstructured{} + ret.SetGroupVersionKind(kind) + return ret, nil } } // CreateSerializers create a *rest.Serializers for encoding or decoding runtime object -func (sm *SerializerManager) CreateSerializers(contentType, group, version string) (*rest.Serializers, error) { - mediaTypes := sm.NegotiatedSerializer.SupportedMediaTypes() +func (sm *SerializerManager) CreateSerializers(contentType, group, version, apiPrefix string) (*rest.Serializers, error) { + var mediaTypes []runtime.SerializerInfo + if apiPrefix == "api" { + mediaTypes = sm.NegotiatedSerializer.SupportedMediaTypes() + } else { + mediaTypes = sm.UnstructuredNegotiatedSerializer.SupportedMediaTypes() + } mediaType, _, err := mime.ParseMediaType(contentType) if err != nil { return nil, fmt.Errorf("the content type(%s) specified in the request is not recognized: %v", contentType, err) } - info, ok := runtime.SerializerInfoForMediaType(mediaTypes, mediaType) if !ok { + if mediaType == "application/vnd.kubernetes.protobuf" && apiPrefix != "api" { + return nil, fmt.Errorf("*unstructured.Unstructured(%s/%s) does not implement the protobuf marshalling interface and cannot be encoded to a protobuf message", group, version) + } if len(contentType) != 0 || len(mediaTypes) == 0 { return nil, fmt.Errorf("no serializers registered for %s", contentType) } @@ -117,17 +205,30 @@ func (sm *SerializerManager) CreateSerializers(contentType, group, version strin Group: group, Version: version, } + var encoder runtime.Encoder + var decoder runtime.Decoder + if apiPrefix == "api" { + encoder = sm.NegotiatedSerializer.EncoderForVersion(info.Serializer, &reqGroupVersion) + decoder = sm.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV) + } else { + encoder = sm.UnstructuredNegotiatedSerializer.EncoderForVersion(info.Serializer, &reqGroupVersion) + decoder = sm.UnstructuredNegotiatedSerializer.DecoderToVersion(info.Serializer, &reqGroupVersion) + } s := &rest.Serializers{ - Encoder: sm.NegotiatedSerializer.EncoderForVersion(info.Serializer, &reqGroupVersion), - Decoder: sm.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV), + Encoder: encoder, + Decoder: decoder, RenegotiatedDecoder: func(contentType string, params map[string]string) (runtime.Decoder, error) { info, ok := runtime.SerializerInfoForMediaType(mediaTypes, contentType) if !ok { return nil, fmt.Errorf("serializer for %s not registered", contentType) } - return sm.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV), nil + if apiPrefix == "api" { + return sm.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV), nil + } else { + return sm.UnstructuredNegotiatedSerializer.DecoderToVersion(info.Serializer, &reqGroupVersion), nil + } }, } if info.StreamSerializer != nil { @@ -172,8 +273,8 @@ func DecodeResp(serializers *rest.Serializers, b []byte, reqContentType, respCon } // WatchDecoder generates a Decoder for watch response -func WatchDecoder(serializers *rest.Serializers, body io.ReadCloser) (*restclientwatch.Decoder, error) { +func WatchDecoder(serializers *rest.Serializers, embeddedSerializers *rest.Serializers, body io.ReadCloser) (*restclientwatch.Decoder, error) { framer := serializers.Framer.NewFrameReader(body) streamingDecoder := streaming.NewDecoder(framer, serializers.StreamingSerializer) - return restclientwatch.NewDecoder(streamingDecoder, serializers.Decoder), nil + return restclientwatch.NewDecoder(streamingDecoder, embeddedSerializers.Decoder), nil } diff --git a/pkg/yurthub/proxy/local/local_test.go b/pkg/yurthub/proxy/local/local_test.go index d35c35c821b..f977adc575a 100644 --- a/pkg/yurthub/proxy/local/local_test.go +++ b/pkg/yurthub/proxy/local/local_test.go @@ -371,6 +371,7 @@ func TestServeHTTPForGetReqCache(t *testing.T) { accept string verb string path string + apiPrefix string code int data expectData }{ @@ -391,10 +392,11 @@ func TestServeHTTPForGetReqCache(t *testing.T) { }, }, }, - accept: "application/json", - verb: "GET", - path: "/api/v1/namespaces/default/pods/mypod1", - code: http.StatusOK, + accept: "application/json", + verb: "GET", + path: "/api/v1/namespaces/default/pods/mypod1", + apiPrefix: "api", + code: http.StatusOK, data: expectData{ ns: "default", name: "mypod1", @@ -407,7 +409,7 @@ func TestServeHTTPForGetReqCache(t *testing.T) { for _, tt := range testcases { t.Run(tt.desc, func(t *testing.T) { - jsonDecoder, _ := serializerM.CreateSerializers(tt.accept, "", "v1") + jsonDecoder, _ := serializerM.CreateSerializers(tt.accept, "", "v1", tt.apiPrefix) accessor := meta.NewAccessor() for i := range tt.inputObj { name, _ := accessor.Name(tt.inputObj[i]) @@ -495,6 +497,7 @@ func TestServeHTTPForListReqCache(t *testing.T) { accept string verb string path string + apiPrefix string code int expectD expectData }{ @@ -537,10 +540,11 @@ func TestServeHTTPForListReqCache(t *testing.T) { }, }, }, - accept: "application/json", - verb: "GET", - path: "/api/v1/namespaces/default/pods", - code: http.StatusOK, + accept: "application/json", + verb: "GET", + path: "/api/v1/namespaces/default/pods", + apiPrefix: "api", + code: http.StatusOK, expectD: expectData{ rv: "6", data: map[string]struct{}{ @@ -556,7 +560,7 @@ func TestServeHTTPForListReqCache(t *testing.T) { for _, tt := range testcases { t.Run(tt.desc, func(t *testing.T) { - jsonDecoder, _ := serializerM.CreateSerializers(tt.accept, "", "v1") + jsonDecoder, _ := serializerM.CreateSerializers(tt.accept, "", "v1", tt.apiPrefix) accessor := meta.NewAccessor() for i := range tt.inputObj { name, _ := accessor.Name(tt.inputObj[i])