Skip to content

Commit

Permalink
Enhance Yurthub caching ability
Browse files Browse the repository at this point in the history
  • Loading branch information
qclc committed Mar 29, 2021
1 parent e1c1651 commit 1301a90
Show file tree
Hide file tree
Showing 6 changed files with 780 additions and 106 deletions.
102 changes: 26 additions & 76 deletions pkg/yurthub/cachemanager/cache_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os"
"path"
"strconv"
"strings"
"sync"

"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
Expand All @@ -33,6 +34,7 @@ import (

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
Expand All @@ -42,40 +44,6 @@ import (
"k8s.io/klog"
)

var (
resourceToKindMap = map[string]string{
"nodes": "Node",
"pods": "Pod",
"services": "Service",
"namespaces": "Namespace",
"endpoints": "Endpoints",
"configmaps": "ConfigMap",
"persistentvolumes": "PersistentVolume",
"persistentvolumeclaims": "PersistentVolumeClaim",
"events": "Event",
"secrets": "Secret",
"leases": "Lease",
"runtimeclasses": "RuntimeClass",
"csidrivers": "CSIDriver",
}

resourceToListKindMap = map[string]string{
"nodes": "NodeList",
"pods": "PodList",
"services": "ServiceList",
"namespaces": "NamespaceList",
"endpoints": "EndpointsList",
"configmaps": "ConfigMapList",
"persistentvolumes": "PersistentVolumeList",
"persistentvolumeclaims": "PersistentVolumeClaimList",
"events": "EventList",
"secrets": "SecretList",
"leases": "LeaseList",
"runtimeclasses": "RuntimeClassList",
"csidrivers": "CSIDriverList",
}
)

// CacheManager is an adaptor to cache runtime object data into backend storage
type CacheManager interface {
CacheResponse(ctx context.Context, prc io.ReadCloser, stopCh <-chan struct{}) error
Expand All @@ -89,6 +57,7 @@ type cacheManager struct {
sync.RWMutex
storage StorageWrapper
serializerManager *serializer.SerializerManager
handlerLayer *HandlerLayer
cacheAgents map[string]bool
}

Expand All @@ -100,6 +69,7 @@ func NewCacheManager(
cm := &cacheManager{
storage: storage,
serializerManager: serializerMgr,
handlerLayer: NewHandlerLayer(),
cacheAgents: make(map[string]bool),
}

Expand Down Expand Up @@ -169,19 +139,6 @@ func (cm *cacheManager) queryListObject(req *http.Request) (runtime.Object, erro
comp, _ := util.ClientComponentFrom(ctx)
info, _ := apirequest.RequestInfoFrom(ctx)

listKind := resourceToListKindMap[info.Resource]
listGvk := schema.GroupVersionKind{
Group: info.APIGroup,
Version: info.APIVersion,
Kind: listKind,
}

listObj, err := scheme.Scheme.New(listGvk)
if err != nil {
klog.Errorf("failed to create list object(%v), %v", listGvk, err)
return nil, err
}

key, err := util.KeyFunc(comp, info.Resource, info.Namespace, info.Name)
if err != nil {
return nil, err
Expand All @@ -190,6 +147,24 @@ func (cm *cacheManager) queryListObject(req *http.Request) (runtime.Object, erro
objs, err := cm.storage.List(key)
if err != nil {
return nil, err
} else if len(objs) == 0 {
return nil, nil
}
var listObj runtime.Object
listGvk := schema.GroupVersionKind{
Group: info.APIGroup,
Version: info.APIVersion,
Kind: objs[0].GetObjectKind().GroupVersionKind().Kind + "List",
}
if scheme.Scheme.Recognizes(listGvk) {
listObj, err = scheme.Scheme.New(listGvk)
if err != nil {
klog.Errorf("failed to create list object(%v), %v", listGvk, err)
return nil, err
}
} else {
listObj = new(unstructured.UnstructuredList)
listObj.GetObjectKind().SetGroupVersionKind(listGvk)
}

listRv := 0
Expand Down Expand Up @@ -248,20 +223,10 @@ func (cm *cacheManager) saveWatchObject(ctx context.Context, info *apirequest.Re

comp, _ := util.ClientComponentFrom(ctx)
reqContentType, _ := util.ReqContentTypeFrom(ctx)
serializers, err := cm.serializerManager.CreateSerializers(reqContentType, info.APIGroup, info.APIVersion)
if err != nil {
klog.Errorf("failed to create serializers in saveWatchObject, %v", err)
return err
}

kind := resourceToKindMap[info.Resource]
apiVersion := schema.GroupVersion{
Group: info.APIGroup,
Version: info.APIVersion,
}.String()
accessor := meta.NewAccessor()

d, err := serializer.WatchDecoder(serializers, r)
d, err := serializer.CreateWatchDecoder(reqContentType, info.APIGroup, info.APIVersion, info.Resource, r)
if err != nil {
klog.Errorf("saveWatchObject ended with error, %v", err)
return err
Expand Down Expand Up @@ -300,8 +265,6 @@ func (cm *cacheManager) saveWatchObject(ctx context.Context, info *apirequest.Re

switch watchType {
case watch.Added, watch.Modified:
accessor.SetAPIVersion(obj, apiVersion)
accessor.SetKind(obj, kind)
err = cm.saveOneObjectWithValidation(key, obj)
if watchType == watch.Added {
addObjCnt++
Expand Down Expand Up @@ -336,7 +299,7 @@ func (cm *cacheManager) saveWatchObject(ctx context.Context, info *apirequest.Re
func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.RequestInfo, b []byte) error {
reqContentType, _ := util.ReqContentTypeFrom(ctx)
respContentType, _ := util.RespContentTypeFrom(ctx)
serializers, err := cm.serializerManager.CreateSerializers(reqContentType, info.APIGroup, info.APIVersion)
serializers, err := cm.serializerManager.CreateSerializers(reqContentType, info.APIGroup, info.APIVersion, info.Resource)
if err != nil {
klog.Errorf("failed to create serializers in saveListObject, %v", err)
return err
Expand All @@ -362,7 +325,7 @@ func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.Req
}
klog.V(5).Infof("list items for %s is: %d", util.ReqInfoString(info), len(items))

kind := resourceToKindMap[info.Resource]
kind := strings.TrimSuffix(list.GetObjectKind().GroupVersionKind().Kind, "List")
apiVersion := schema.GroupVersion{
Group: info.APIGroup,
Version: info.APIVersion,
Expand Down Expand Up @@ -415,7 +378,7 @@ func (cm *cacheManager) saveOneObject(ctx context.Context, info *apirequest.Requ
reqContentType, _ := util.ReqContentTypeFrom(ctx)
respContentType, _ := util.RespContentTypeFrom(ctx)

serializers, err := cm.serializerManager.CreateSerializers(reqContentType, info.APIGroup, info.APIVersion)
serializers, err := cm.serializerManager.CreateSerializers(reqContentType, info.APIGroup, info.APIVersion, info.Resource)
if err != nil {
klog.Errorf("failed to create serializers in saveOneObject: %s, %v", util.ReqInfoString(info), err)
return err
Expand All @@ -435,15 +398,6 @@ func (cm *cacheManager) saveOneObject(ctx context.Context, info *apirequest.Requ
// it's not need to cache for status
return nil
}

kind := resourceToKindMap[info.Resource]
apiVersion := schema.GroupVersion{
Group: info.APIGroup,
Version: info.APIVersion,
}.String()

accessor.SetKind(obj, kind)
accessor.SetAPIVersion(obj, apiVersion)
}

var name string
Expand Down Expand Up @@ -574,9 +528,5 @@ func (cm *cacheManager) CanCacheFor(req *http.Request) bool {
return false
}

if _, ok := resourceToKindMap[info.Resource]; !ok {
return false
}

return true
}
Loading

0 comments on commit 1301a90

Please sign in to comment.