Skip to content

Commit

Permalink
Add WithVersionCodecFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
qclc committed Feb 23, 2021
1 parent 4d7463a commit 9aaa238
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 57 deletions.
67 changes: 13 additions & 54 deletions pkg/yurthub/cachemanager/cache_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os"
"path"
"strconv"
"strings"
"sync"

"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
Expand All @@ -42,40 +43,6 @@ import (
"k8s.io/klog"
)

var (
resourceToKindMap = map[string]string{
"nodes": "Node",
"pods": "Pod",
"services": "Service",
"namespaces": "Namespace",
"endpoints": "Endpoints",
"configmaps": "ConfigMap",
"persistentvolumes": "PersistentVolume",
"persistentvolumeclaims": "PersistentVolumeClaim",
"events": "Event",
"secrets": "Secret",
"leases": "Lease",
"runtimeclasses": "RuntimeClass",
"csidrivers": "CSIDriver",
}

resourceToListKindMap = map[string]string{
"nodes": "NodeList",
"pods": "PodList",
"services": "ServiceList",
"namespaces": "NamespaceList",
"endpoints": "EndpointsList",
"configmaps": "ConfigMapList",
"persistentvolumes": "PersistentVolumeList",
"persistentvolumeclaims": "PersistentVolumeClaimList",
"events": "EventList",
"secrets": "SecretList",
"leases": "LeaseList",
"runtimeclasses": "RuntimeClassList",
"csidrivers": "CSIDriverList",
}
)

// CacheManager is an adaptor to cache runtime object data into backend storage
type CacheManager interface {
CacheResponse(ctx context.Context, prc io.ReadCloser, stopCh <-chan struct{}) error
Expand Down Expand Up @@ -169,26 +136,26 @@ func (cm *cacheManager) queryListObject(req *http.Request) (runtime.Object, erro
comp, _ := util.ClientComponentFrom(ctx)
info, _ := apirequest.RequestInfoFrom(ctx)

listKind := resourceToListKindMap[info.Resource]
listGvk := schema.GroupVersionKind{
Group: info.APIGroup,
Version: info.APIVersion,
Kind: listKind,
}

listObj, err := scheme.Scheme.New(listGvk)
key, err := util.KeyFunc(comp, info.Resource, info.Namespace, info.Name)
if err != nil {
klog.Errorf("failed to create list object(%v), %v", listGvk, err)
return nil, err
}

key, err := util.KeyFunc(comp, info.Resource, info.Namespace, info.Name)
objs, err := cm.storage.List(key)
if err != nil {
return nil, err
}

objs, err := cm.storage.List(key)
listKind := objs[0].GetObjectKind().GroupVersionKind().Kind + "List"
listGvk := schema.GroupVersionKind{
Group: info.APIGroup,
Version: info.APIVersion,
Kind: listKind,
}

listObj, err := scheme.Scheme.New(listGvk)
if err != nil {
klog.Errorf("failed to create list object(%v), %v", listGvk, err)
return nil, err
}

Expand Down Expand Up @@ -254,7 +221,6 @@ func (cm *cacheManager) saveWatchObject(ctx context.Context, info *apirequest.Re
return err
}

kind := resourceToKindMap[info.Resource]
apiVersion := schema.GroupVersion{
Group: info.APIGroup,
Version: info.APIVersion,
Expand Down Expand Up @@ -301,7 +267,6 @@ func (cm *cacheManager) saveWatchObject(ctx context.Context, info *apirequest.Re
switch watchType {
case watch.Added, watch.Modified:
accessor.SetAPIVersion(obj, apiVersion)
accessor.SetKind(obj, kind)
err = cm.saveOneObjectWithValidation(key, obj)
if watchType == watch.Added {
addObjCnt++
Expand Down Expand Up @@ -362,7 +327,7 @@ func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.Req
}
klog.V(5).Infof("list items for %s is: %d", util.ReqInfoString(info), len(items))

kind := resourceToKindMap[info.Resource]
kind := strings.TrimSuffix(list.GetObjectKind().GroupVersionKind().Kind, "List")
apiVersion := schema.GroupVersion{
Group: info.APIGroup,
Version: info.APIVersion,
Expand Down Expand Up @@ -436,13 +401,11 @@ func (cm *cacheManager) saveOneObject(ctx context.Context, info *apirequest.Requ
return nil
}

kind := resourceToKindMap[info.Resource]
apiVersion := schema.GroupVersion{
Group: info.APIGroup,
Version: info.APIVersion,
}.String()

accessor.SetKind(obj, kind)
accessor.SetAPIVersion(obj, apiVersion)
}

Expand Down Expand Up @@ -574,9 +537,5 @@ func (cm *cacheManager) CanCacheFor(req *http.Request) bool {
return false
}

if _, ok := resourceToKindMap[info.Resource]; !ok {
return false
}

return true
}
4 changes: 2 additions & 2 deletions pkg/yurthub/cachemanager/storage_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"bytes"
"sync"

"github.com/openyurtio/openyurt/pkg/yurthub/storage"
"github.com/openyurtio/openyurt/pkg/yurthub/util"

"github.com/openyurtio/openyurt/pkg/yurthub/storage"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -53,7 +53,7 @@ type storageWrapper struct {
func NewStorageWrapper(storage storage.Store) StorageWrapper {
return &storageWrapper{
store: storage,
backendSerializer: json.NewSerializer(json.DefaultMetaFactory, scheme.Scheme, scheme.Scheme, false),
backendSerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, scheme.Scheme, scheme.Scheme, json.SerializerOptions{}),
cache: make(map[string]runtime.Object),
}
}
Expand Down
39 changes: 38 additions & 1 deletion pkg/yurthub/kubernetes/serializer/serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,42 @@ import (
// YurtHubSerializer is a global serializer manager for yurthub
var YurtHubSerializer = NewSerializerManager()

// WithVersionCodecFactory is a CodecFactory that will explicitly ignore requests to perform conversion.
// It keeps the gvk during deserialization.
// This wrapper is used while code migrates away from using conversion (such as external clients)
type WithVersionCodecFactory struct {
serializer.CodecFactory
}

// EncoderForVersion returns an encoder that does not do conversion, but does set the group version kind of the object
// when serialized.
func (f WithVersionCodecFactory) EncoderForVersion(serializer runtime.Encoder, version runtime.GroupVersioner) runtime.Encoder {
return runtime.WithVersionEncoder{
Version: version,
Encoder: serializer,
ObjectTyper: scheme.Scheme,
}
}

// DecoderToVersion returns an decoder that does not do conversion, and keeps the gvk information
func (f WithVersionCodecFactory) DecoderToVersion(serializer runtime.Decoder, _ runtime.GroupVersioner) runtime.Decoder {
//return versioning.NewDefaultingCodecForScheme(s.scheme, nil, decoder, nil, gv)
return WithVersionDecoder{
Decoder: serializer,
}
}

// WithVersionDecoder keeps the group version kind of a deserialized object.
type WithVersionDecoder struct {
runtime.Decoder
}

// Decode does not do conversion. It keeps the gvk during deserialization.
func (d WithVersionDecoder) Decode(data []byte, defaults *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
obj, gvk, err := d.Decoder.Decode(data, defaults, into)
return obj, gvk, err
}

// SerializerManager is responsible for managing *rest.Serializers
type SerializerManager struct {
// NegotiatedSerializer is used for obtaining encoders and decoders for multiple
Expand All @@ -45,7 +81,8 @@ type SerializerManager struct {
// NewSerializerManager creates a *SerializerManager object with no version conversion
func NewSerializerManager() *SerializerManager {
return &SerializerManager{
NegotiatedSerializer: serializer.WithoutConversionCodecFactory{CodecFactory: scheme.Codecs}, // do not need version conversion
// do not need version conversion, and keep the gvk information
NegotiatedSerializer: WithVersionCodecFactory{CodecFactory: scheme.Codecs},
}
}

Expand Down

0 comments on commit 9aaa238

Please sign in to comment.