Skip to content

Commit

Permalink
refactor cache manager for yurthub
Browse files Browse the repository at this point in the history
1. replace local cache response for list request
2. fix cache conflict for labelselector/fieldselector list and full list.
  • Loading branch information
rambohe-ch committed Apr 20, 2021
1 parent f0b6be1 commit 7c18345
Show file tree
Hide file tree
Showing 14 changed files with 881 additions and 548 deletions.
24 changes: 12 additions & 12 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# Copyright 2020 The OpenYurt Authors.
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -17,16 +17,16 @@
all: test build

# Build binaries in the host environment
build:
build:
bash hack/make-rules/build.sh $(WHAT)

# generate yaml files
# generate yaml files
gen-yaml:
hack/make-rules/genyaml.sh $(WHAT)

# Run test
test: fmt vet
go test ./pkg/... ./cmd/... -coverprofile cover.out
go test -v ./pkg/... ./cmd/... -coverprofile cover.out
go test -v -coverpkg=./pkg/yurttunnel/... -coverprofile=yurttunnel-cover.out ./test/integration/yurttunnel_test.go

# Run go fmt against code
Expand All @@ -37,7 +37,7 @@ fmt:
vet:
go vet ./pkg/... ./cmd/...

# Build binaries and docker images.
# Build binaries and docker images.
# NOTE: this rule can take time, as we build binaries inside containers
#
# ARGS:
Expand All @@ -47,18 +47,18 @@ vet:
# set it as cn.
#
# Examples:
# # compile yurthub, yurt-controller-manager and yurtctl-servant with
# # compile yurthub, yurt-controller-manager and yurtctl-servant with
# # architectures arm64 and arm in the mainland China
# make release WHAT="yurthub yurt-controller-manager yurtctl-servant" ARCH="arm64 arm" REGION=cn
#
# # compile all components with all architectures (i.e., amd64, arm64, arm)
# make relase
# make relase
release:
bash hack/make-rules/release-images.sh

clean:
clean:
-rm -Rf _output
-rm -Rf dockerbuild

e2e:
hack/make-rules/build-e2e.sh
e2e:
hack/make-rules/build-e2e.sh
36 changes: 28 additions & 8 deletions pkg/yurthub/cachemanager/cache_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@ import (
"strings"
"testing"

"github.com/openyurtio/openyurt/pkg/yurthub/storage/disk"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/endpoints/request"
)

func TestInitCacheAgents(t *testing.T) {
s := NewFakeStorageWrapper()
dStorage, err := disk.NewDiskStorage(rootDir)
if err != nil {
t.Errorf("failed to create disk storage, %v", err)
}
s := NewStorageWrapper(dStorage)
m, _ := NewCacheManager(s, nil)

// default cache agents in fake store
Expand Down Expand Up @@ -62,23 +68,32 @@ func TestInitCacheAgents(t *testing.T) {
if !compareAgents(gotAgents2, m.ListCacheAgents()) {
t.Errorf("Got agents: %v, cache agents map: %v", gotAgents2, m.ListCacheAgents())
}

err = s.Delete(cacheAgentsKey)
if err != nil {
t.Errorf("failed to delete cache agents key, %v", err)
}
}

func TestUpdateCacheAgents(t *testing.T) {
s := NewFakeStorageWrapper()
dStorage, err := disk.NewDiskStorage(rootDir)
if err != nil {
t.Errorf("failed to create disk storage, %v", err)
}
s := NewStorageWrapper(dStorage)
m, _ := NewCacheManager(s, nil)

tests := []struct {
testcases := map[string]struct {
desc string
addAgents []string
expectAgents []string
}{
{desc: "add one agent", addAgents: []string{"agent1"}, expectAgents: append(defaultCacheAgents, "agent1")},
{desc: "update with two agents", addAgents: []string{"agent2", "agent3"}, expectAgents: append(defaultCacheAgents, "agent2", "agent3")},
{desc: "update with more two agents", addAgents: []string{"agent4", "agent5"}, expectAgents: append(defaultCacheAgents, "agent4", "agent5")},
"add one agent": {addAgents: []string{"agent1"}, expectAgents: append(defaultCacheAgents, "agent1")},
"update with two agents": {addAgents: []string{"agent2", "agent3"}, expectAgents: append(defaultCacheAgents, "agent2", "agent3")},
"update with more two agents": {addAgents: []string{"agent4", "agent5"}, expectAgents: append(defaultCacheAgents, "agent4", "agent5")},
}
for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
for k, tt := range testcases {
t.Run(k, func(t *testing.T) {

// add agents
err := m.UpdateCacheAgents(tt.addAgents)
Expand All @@ -99,6 +114,11 @@ func TestUpdateCacheAgents(t *testing.T) {
if !compareAgents(gotAgents, m.ListCacheAgents()) {
t.Errorf("Got agents: %v, cache agents map: %v", gotAgents, m.ListCacheAgents())
}

err = s.Delete(cacheAgentsKey)
if err != nil {
t.Errorf("failed to delete cache agents key, %v", err)
}
})
}
}
Expand Down
101 changes: 59 additions & 42 deletions pkg/yurthub/cachemanager/cache_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os"
"path"
"strconv"
"strings"
"sync"

"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
Expand Down Expand Up @@ -90,6 +91,7 @@ type cacheManager struct {
storage StorageWrapper
serializerManager *serializer.SerializerManager
cacheAgents map[string]bool
listReqCollector map[string]struct{}
}

// NewCacheManager creates a new CacheManager
Expand All @@ -101,6 +103,7 @@ func NewCacheManager(
storage: storage,
serializerManager: serializerMgr,
cacheAgents: make(map[string]bool),
listReqCollector: make(map[string]struct{}),
}

err := cm.initCacheAgents()
Expand Down Expand Up @@ -343,14 +346,12 @@ func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.Req
}

list, err := serializer.DecodeResp(serializers, b, reqContentType, respContentType)
if err != nil {
if err != nil || list == nil {
klog.Errorf("failed to decode response in saveOneObject %v", err)
return err
}

switch list.(type) {
case *metav1.Status:
// it's not need to cache for status
if _, ok := list.(*metav1.Status); ok {
klog.Infof("it's not need to cache metav1.Status")
return nil
}
Expand All @@ -375,48 +376,48 @@ func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.Req
// in local disk, so when cloud-edge network disconnected,
// yurthub can return empty objects instead of 404 code(not found)
if len(items) == 0 {
// list returns no objects
key, _ := util.KeyFunc(comp, info.Resource, info.Namespace, "")
return cm.storage.Create(key, nil)
}

var errs []error
for i := range items {
name, err := accessor.Name(items[i])
if err != nil || name == "" {
klog.Errorf("failed to get name of list items object, %v", err)
continue
} else if info.Name != "" {
// list with fieldSelector=metadata.name=xxx
if len(items) != 1 {
return fmt.Errorf("%s with fieldSelector=metadata.name=%s, but return more than one objects: %d", util.ReqInfoString(info), info.Name, len(items))
}

ns, err := accessor.Namespace(items[i])
if err != nil {
klog.Errorf("failed to get namespace of list items object, %v", err)
continue
} else if ns == "" {
accessor.SetKind(items[0], kind)
accessor.SetAPIVersion(items[0], apiVersion)
name, _ := accessor.Name(items[0])
ns, _ := accessor.Namespace(items[0])
if ns == "" {
ns = info.Namespace
}

klog.V(5).Infof("path for list item(%d): %s/%s/%s/%s", i, comp, info.Resource, ns, name)
key, err := util.KeyFunc(comp, info.Resource, ns, name)
if err != nil || key == "" {
klog.Errorf("failed to get cache key(%s:%s:%s:%s), %v", comp, info.Resource, ns, name, err)
return err
}

accessor.SetKind(items[i], kind)
accessor.SetAPIVersion(items[i], apiVersion)
err = cm.saveOneObjectWithValidation(key, items[i])
key, _ := util.KeyFunc(comp, info.Resource, ns, name)
err = cm.saveOneObjectWithValidation(key, items[0])
if err == storage.ErrStorageAccessConflict {
klog.V(2).Infof("skip to cache list object because key(%s) is under processing", key)
} else if err != nil {
errs = append(errs, fmt.Errorf("failed to save object(%s), %v", key, err))
return nil
}
}

if len(errs) != 0 {
return fmt.Errorf("failed to save list object, %#+v", errs)
}
return err
} else {
// list all of objects or fieldselector/labelselector
rootKey, _ := util.KeyFunc(comp, info.Resource, info.Namespace, info.Name)
objs := make(map[string]runtime.Object)
for i := range items {
accessor.SetKind(items[i], kind)
accessor.SetAPIVersion(items[i], apiVersion)
name, _ := accessor.Name(items[i])
ns, _ := accessor.Namespace(items[i])
if ns == "" {
ns = info.Namespace
}

return nil
key, _ := util.KeyFunc(comp, info.Resource, ns, name)
objs[key] = items[i]
}

return cm.storage.Replace(rootKey, objs)
}
}

func (cm *cacheManager) saveOneObject(ctx context.Context, info *apirequest.RequestInfo, b []byte) error {
Expand All @@ -436,15 +437,12 @@ func (cm *cacheManager) saveOneObject(ctx context.Context, info *apirequest.Requ
klog.Errorf("failed to decode response in saveOneObject(reqContentType:%s, respContentType:%s): %s, %v", reqContentType, respContentType, util.ReqInfoString(info), err)
return err
} else if obj == nil {
klog.Info("failed to decode nil object. skip cache")
return nil
} else if _, ok := obj.(*metav1.Status); ok {
klog.Infof("it's not need to cache metav1.Status.")
return nil
} else {
switch obj.(type) {
case *metav1.Status:
// it's not need to cache for status
return nil
}

kind := resourceToKindMap[info.Resource]
apiVersion := schema.GroupVersion{
Group: info.APIGroup,
Expand Down Expand Up @@ -587,5 +585,24 @@ func (cm *cacheManager) CanCacheFor(req *http.Request) bool {
return false
}

cm.Lock()
defer cm.Unlock()
if info.Verb == "list" && info.Name == "" {
key, _ := util.KeyFunc(comp, info.Resource, info.Namespace, info.Name)
if _, ok := cm.listReqCollector[key]; !ok {
for k := range cm.listReqCollector {
if len(k) > len(key) {
if strings.Contains(k, key) {
return false
}
} else {
if strings.Contains(key, k) {
return false
}
}
}
cm.listReqCollector[key] = struct{}{}
}
}
return true
}
Loading

0 comments on commit 7c18345

Please sign in to comment.