Skip to content

Commit

Permalink
Enhance Yurthub caching ability
Browse files Browse the repository at this point in the history
  • Loading branch information
qclc committed Mar 15, 2021
1 parent e1c1651 commit 6a95bed
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 85 deletions.
95 changes: 36 additions & 59 deletions pkg/yurthub/cachemanager/cache_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os"
"path"
"strconv"
"strings"
"sync"

"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
Expand All @@ -33,6 +34,7 @@ import (

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
Expand All @@ -42,40 +44,6 @@ import (
"k8s.io/klog"
)

var (
resourceToKindMap = map[string]string{
"nodes": "Node",
"pods": "Pod",
"services": "Service",
"namespaces": "Namespace",
"endpoints": "Endpoints",
"configmaps": "ConfigMap",
"persistentvolumes": "PersistentVolume",
"persistentvolumeclaims": "PersistentVolumeClaim",
"events": "Event",
"secrets": "Secret",
"leases": "Lease",
"runtimeclasses": "RuntimeClass",
"csidrivers": "CSIDriver",
}

resourceToListKindMap = map[string]string{
"nodes": "NodeList",
"pods": "PodList",
"services": "ServiceList",
"namespaces": "NamespaceList",
"endpoints": "EndpointsList",
"configmaps": "ConfigMapList",
"persistentvolumes": "PersistentVolumeList",
"persistentvolumeclaims": "PersistentVolumeClaimList",
"events": "EventList",
"secrets": "SecretList",
"leases": "LeaseList",
"runtimeclasses": "RuntimeClassList",
"csidrivers": "CSIDriverList",
}
)

// CacheManager is an adaptor to cache runtime object data into backend storage
type CacheManager interface {
CacheResponse(ctx context.Context, prc io.ReadCloser, stopCh <-chan struct{}) error
Expand Down Expand Up @@ -169,26 +137,37 @@ func (cm *cacheManager) queryListObject(req *http.Request) (runtime.Object, erro
comp, _ := util.ClientComponentFrom(ctx)
info, _ := apirequest.RequestInfoFrom(ctx)

listKind := resourceToListKindMap[info.Resource]
listGvk := schema.GroupVersionKind{
Group: info.APIGroup,
Version: info.APIVersion,
Kind: listKind,
}

listObj, err := scheme.Scheme.New(listGvk)
key, err := util.KeyFunc(comp, info.Resource, info.Namespace, info.Name)
if err != nil {
klog.Errorf("failed to create list object(%v), %v", listGvk, err)
return nil, err
}

key, err := util.KeyFunc(comp, info.Resource, info.Namespace, info.Name)
objs, err := cm.storage.List(key)
if err != nil {
return nil, err
}

objs, err := cm.storage.List(key)
var listKind string
if len(objs) == 0 {
klog.Errorf("no %s in cache", info.Resource)
return nil, err
} else {
listKind = objs[0].GetObjectKind().GroupVersionKind().Kind + "List"
}
var listObj runtime.Object
listGvk := schema.GroupVersionKind{
Group: info.APIGroup,
Version: info.APIVersion,
Kind: listKind,
}
if scheme.Scheme.Recognizes(listGvk) {
listObj, err = scheme.Scheme.New(listGvk)
} else {
tmp := new(unstructured.UnstructuredList)
tmp.SetGroupVersionKind(listGvk)
listObj = tmp
}
if err != nil {
klog.Errorf("failed to create list object(%v), %v", listGvk, err)
return nil, err
}

Expand Down Expand Up @@ -248,20 +227,25 @@ func (cm *cacheManager) saveWatchObject(ctx context.Context, info *apirequest.Re

comp, _ := util.ClientComponentFrom(ctx)
reqContentType, _ := util.ReqContentTypeFrom(ctx)
serializers, err := cm.serializerManager.CreateSerializers(reqContentType, info.APIGroup, info.APIVersion)
serializers, err := cm.serializerManager.CreateSerializers(reqContentType, info.APIGroup, info.APIVersion, "api")
if err != nil {
klog.Errorf("failed to create serializers in saveWatchObject, %v", err)
return err
}

embeddedSerializers, err := cm.serializerManager.CreateSerializers(reqContentType, info.APIGroup, info.APIVersion, info.APIPrefix)
if err != nil {
klog.Errorf("failed to create serializers in saveWatchObject, %v", err)
return err
}

kind := resourceToKindMap[info.Resource]
apiVersion := schema.GroupVersion{
Group: info.APIGroup,
Version: info.APIVersion,
}.String()
accessor := meta.NewAccessor()

d, err := serializer.WatchDecoder(serializers, r)
d, err := serializer.WatchDecoder(serializers, embeddedSerializers, r)
if err != nil {
klog.Errorf("saveWatchObject ended with error, %v", err)
return err
Expand Down Expand Up @@ -301,7 +285,6 @@ func (cm *cacheManager) saveWatchObject(ctx context.Context, info *apirequest.Re
switch watchType {
case watch.Added, watch.Modified:
accessor.SetAPIVersion(obj, apiVersion)
accessor.SetKind(obj, kind)
err = cm.saveOneObjectWithValidation(key, obj)
if watchType == watch.Added {
addObjCnt++
Expand Down Expand Up @@ -336,7 +319,7 @@ func (cm *cacheManager) saveWatchObject(ctx context.Context, info *apirequest.Re
func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.RequestInfo, b []byte) error {
reqContentType, _ := util.ReqContentTypeFrom(ctx)
respContentType, _ := util.RespContentTypeFrom(ctx)
serializers, err := cm.serializerManager.CreateSerializers(reqContentType, info.APIGroup, info.APIVersion)
serializers, err := cm.serializerManager.CreateSerializers(reqContentType, info.APIGroup, info.APIVersion, info.APIPrefix)
if err != nil {
klog.Errorf("failed to create serializers in saveListObject, %v", err)
return err
Expand All @@ -362,7 +345,7 @@ func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.Req
}
klog.V(5).Infof("list items for %s is: %d", util.ReqInfoString(info), len(items))

kind := resourceToKindMap[info.Resource]
kind := strings.TrimSuffix(list.GetObjectKind().GroupVersionKind().Kind, "List")
apiVersion := schema.GroupVersion{
Group: info.APIGroup,
Version: info.APIVersion,
Expand Down Expand Up @@ -415,7 +398,7 @@ func (cm *cacheManager) saveOneObject(ctx context.Context, info *apirequest.Requ
reqContentType, _ := util.ReqContentTypeFrom(ctx)
respContentType, _ := util.RespContentTypeFrom(ctx)

serializers, err := cm.serializerManager.CreateSerializers(reqContentType, info.APIGroup, info.APIVersion)
serializers, err := cm.serializerManager.CreateSerializers(reqContentType, info.APIGroup, info.APIVersion, info.APIPrefix)
if err != nil {
klog.Errorf("failed to create serializers in saveOneObject: %s, %v", util.ReqInfoString(info), err)
return err
Expand All @@ -436,13 +419,11 @@ func (cm *cacheManager) saveOneObject(ctx context.Context, info *apirequest.Requ
return nil
}

kind := resourceToKindMap[info.Resource]
apiVersion := schema.GroupVersion{
Group: info.APIGroup,
Version: info.APIVersion,
}.String()

accessor.SetKind(obj, kind)
accessor.SetAPIVersion(obj, apiVersion)
}

Expand Down Expand Up @@ -574,9 +555,5 @@ func (cm *cacheManager) CanCacheFor(req *http.Request) bool {
return false
}

if _, ok := resourceToKindMap[info.Resource]; !ok {
return false
}

return true
}
12 changes: 10 additions & 2 deletions pkg/yurthub/cachemanager/cache_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func TestCacheResponse(t *testing.T) {
accept string
verb string
path string
apiPrefix string
namespaced bool
expectResult expectData
}{
Expand All @@ -93,6 +94,7 @@ func TestCacheResponse(t *testing.T) {
accept: "application/json",
verb: "GET",
path: "/api/v1/namespaces/default/pods/mypod1",
apiPrefix: "api",
namespaced: true,
expectResult: expectData{
rv: "1",
Expand Down Expand Up @@ -121,6 +123,7 @@ func TestCacheResponse(t *testing.T) {
accept: "application/json",
verb: "GET",
path: "/api/v1/namespaces/default/pods/mypod2",
apiPrefix: "api",
namespaced: true,
expectResult: expectData{
rv: "3",
Expand Down Expand Up @@ -148,6 +151,7 @@ func TestCacheResponse(t *testing.T) {
accept: "application/json",
verb: "GET",
path: "/api/v1/nodes/mynode1",
apiPrefix: "api",
namespaced: false,
expectResult: expectData{
rv: "4",
Expand All @@ -174,6 +178,7 @@ func TestCacheResponse(t *testing.T) {
accept: "application/json",
verb: "GET",
path: "/api/v1/nodes/mynode2",
apiPrefix: "api",
namespaced: false,
expectResult: expectData{
rv: "6",
Expand All @@ -187,7 +192,7 @@ func TestCacheResponse(t *testing.T) {
resolver := newTestRequestInfoResolver()
for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
encoder, err := serializerM.CreateSerializers(tt.accept, tt.group, tt.version)
encoder, err := serializerM.CreateSerializers(tt.accept, tt.group, tt.version, tt.apiPrefix)
if err != nil {
t.Fatalf("could not create serializer, %v", err)
}
Expand Down Expand Up @@ -503,6 +508,7 @@ func TestCacheResponseForList(t *testing.T) {
accept string
verb string
path string
apiPrefix string
namespaced bool
expectResult expectData
}{
Expand Down Expand Up @@ -561,6 +567,7 @@ func TestCacheResponseForList(t *testing.T) {
accept: "application/json",
verb: "GET",
path: "/api/v1/namespaces/default/pods",
apiPrefix: "api",
namespaced: true,
expectResult: expectData{
data: map[string]struct{}{
Expand Down Expand Up @@ -632,6 +639,7 @@ func TestCacheResponseForList(t *testing.T) {
accept: "application/json",
verb: "GET",
path: "/api/v1/nodes",
apiPrefix: "api",
namespaced: false,
expectResult: expectData{
data: map[string]struct{}{
Expand All @@ -648,7 +656,7 @@ func TestCacheResponseForList(t *testing.T) {
resolver := newTestRequestInfoResolver()
for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
encoder, err := serializerM.CreateSerializers(tt.accept, tt.group, tt.version)
encoder, err := serializerM.CreateSerializers(tt.accept, tt.group, tt.version, tt.apiPrefix)
if err != nil {
t.Fatalf("could not create serializer, %v", err)
}
Expand Down
32 changes: 27 additions & 5 deletions pkg/yurthub/cachemanager/storage_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import (
"bytes"
"sync"

"github.com/openyurtio/openyurt/pkg/yurthub/storage"
"github.com/openyurtio/openyurt/pkg/yurthub/util"

"github.com/openyurtio/openyurt/pkg/yurthub/storage"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -53,7 +54,7 @@ type storageWrapper struct {
func NewStorageWrapper(storage storage.Store) StorageWrapper {
return &storageWrapper{
store: storage,
backendSerializer: json.NewSerializer(json.DefaultMetaFactory, scheme.Scheme, scheme.Scheme, false),
backendSerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, scheme.Scheme, scheme.Scheme, json.SerializerOptions{}),
cache: make(map[string]runtime.Object),
}
}
Expand Down Expand Up @@ -113,8 +114,18 @@ func (sw *storageWrapper) Get(key string) (runtime.Object, error) {
} else if len(b) == 0 {
return nil, nil
}

obj, gvk, err := sw.backendSerializer.Decode(b, nil, nil)
//get the gvk from json data
gvk, err := json.DefaultMetaFactory.Interpret(b)
if err != nil {
return nil, err
}
var UnstructuredObj runtime.Object
if scheme.Scheme.Recognizes(*gvk) {
UnstructuredObj = nil
} else {
UnstructuredObj = new(unstructured.Unstructured)
}
obj, gvk, err := sw.backendSerializer.Decode(b, nil, UnstructuredObj)
if err != nil {
klog.Errorf("could not decode %v for %s, %v", gvk, key, err)
return nil, err
Expand Down Expand Up @@ -145,7 +156,18 @@ func (sw *storageWrapper) List(key string) ([]runtime.Object, error) {
}

for i := range bb {
obj, gvk, err := sw.backendSerializer.Decode(bb[i], nil, nil)
//get the gvk from json data
gvk, err := json.DefaultMetaFactory.Interpret(bb[i])
if err != nil {
return nil, err
}
var UnstructuredObj runtime.Object
if scheme.Scheme.Recognizes(*gvk) {
UnstructuredObj = nil
} else {
UnstructuredObj = new(unstructured.Unstructured)
}
obj, gvk, err := sw.backendSerializer.Decode(bb[i], nil, UnstructuredObj)
if err != nil {
klog.Errorf("could not decode %v for %s, %v", gvk, key, err)
continue
Expand Down
Loading

0 comments on commit 6a95bed

Please sign in to comment.