Skip to content

Commit

Permalink
refactor cache manager for yurthub (openyurtio#265)
Browse files Browse the repository at this point in the history
1. replace local cache response for list request
2. fix cache conflict for labelselector/fieldselector list and full list.
  • Loading branch information
rambohe-ch authored Apr 22, 2021
1 parent 885d84d commit 260888e
Show file tree
Hide file tree
Showing 15 changed files with 1,314 additions and 717 deletions.
24 changes: 12 additions & 12 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,18 @@ jobs:
run: find ./* -name "*" | xargs misspell -error
- name: Lint markdown files
run: find ./ -name "*.md" | grep -v enhancements | grep -v .github | xargs mdl -r ~MD010,~MD013,~MD022,~MD024,~MD029,~MD031,~MD032,~MD033,~MD034,~MD036
- name: Check markdown links
run: |
set +e
for name in $(find . -name \*.md | grep -v CHANGELOG); do
if [ -f $name ]; then
markdown-link-check -q $name -c .github/workflows/markdown-link-check.config.json;
if [ $? -ne 0 ]; then
code=1
fi
fi
done
bash -c "exit $code";
# - name: Check markdown links
# run: |
# set +e
# for name in $(find . -name \*.md | grep -v CHANGELOG); do
# if [ -f $name ]; then
# markdown-link-check -q $name -c .github/workflows/markdown-link-check.config.json;
# if [ $? -ne 0 ]; then
# code=1
# fi
# fi
# done
# bash -c "exit $code";

unit-tests:
runs-on: ubuntu-18.04
Expand Down
24 changes: 12 additions & 12 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# Copyright 2020 The OpenYurt Authors.
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -17,16 +17,16 @@
all: test build

# Build binaries in the host environment
build:
build:
bash hack/make-rules/build.sh $(WHAT)

# generate yaml files
# generate yaml files
gen-yaml:
hack/make-rules/genyaml.sh $(WHAT)

# Run test
test: fmt vet
go test ./pkg/... ./cmd/... -coverprofile cover.out
go test -v ./pkg/... ./cmd/... -coverprofile cover.out
go test -v -coverpkg=./pkg/yurttunnel/... -coverprofile=yurttunnel-cover.out ./test/integration/yurttunnel_test.go

# Run go fmt against code
Expand All @@ -37,7 +37,7 @@ fmt:
vet:
go vet ./pkg/... ./cmd/...

# Build binaries and docker images.
# Build binaries and docker images.
# NOTE: this rule can take time, as we build binaries inside containers
#
# ARGS:
Expand All @@ -47,18 +47,18 @@ vet:
# set it as cn.
#
# Examples:
# # compile yurthub, yurt-controller-manager and yurtctl-servant with
# # compile yurthub, yurt-controller-manager and yurtctl-servant with
# # architectures arm64 and arm in the mainland China
# make release WHAT="yurthub yurt-controller-manager yurtctl-servant" ARCH="arm64 arm" REGION=cn
#
# # compile all components with all architectures (i.e., amd64, arm64, arm)
# make relase
# make relase
release:
bash hack/make-rules/release-images.sh

clean:
clean:
-rm -Rf _output
-rm -Rf dockerbuild

e2e:
hack/make-rules/build-e2e.sh
e2e:
hack/make-rules/build-e2e.sh
36 changes: 28 additions & 8 deletions pkg/yurthub/cachemanager/cache_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@ import (
"strings"
"testing"

"github.com/openyurtio/openyurt/pkg/yurthub/storage/disk"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/endpoints/request"
)

func TestInitCacheAgents(t *testing.T) {
s := NewFakeStorageWrapper()
dStorage, err := disk.NewDiskStorage(rootDir)
if err != nil {
t.Errorf("failed to create disk storage, %v", err)
}
s := NewStorageWrapper(dStorage)
m, _ := NewCacheManager(s, nil)

// default cache agents in fake store
Expand Down Expand Up @@ -62,23 +68,32 @@ func TestInitCacheAgents(t *testing.T) {
if !compareAgents(gotAgents2, m.ListCacheAgents()) {
t.Errorf("Got agents: %v, cache agents map: %v", gotAgents2, m.ListCacheAgents())
}

err = s.Delete(cacheAgentsKey)
if err != nil {
t.Errorf("failed to delete cache agents key, %v", err)
}
}

func TestUpdateCacheAgents(t *testing.T) {
s := NewFakeStorageWrapper()
dStorage, err := disk.NewDiskStorage(rootDir)
if err != nil {
t.Errorf("failed to create disk storage, %v", err)
}
s := NewStorageWrapper(dStorage)
m, _ := NewCacheManager(s, nil)

tests := []struct {
testcases := map[string]struct {
desc string
addAgents []string
expectAgents []string
}{
{desc: "add one agent", addAgents: []string{"agent1"}, expectAgents: append(defaultCacheAgents, "agent1")},
{desc: "update with two agents", addAgents: []string{"agent2", "agent3"}, expectAgents: append(defaultCacheAgents, "agent2", "agent3")},
{desc: "update with more two agents", addAgents: []string{"agent4", "agent5"}, expectAgents: append(defaultCacheAgents, "agent4", "agent5")},
"add one agent": {addAgents: []string{"agent1"}, expectAgents: append(defaultCacheAgents, "agent1")},
"update with two agents": {addAgents: []string{"agent2", "agent3"}, expectAgents: append(defaultCacheAgents, "agent2", "agent3")},
"update with more two agents": {addAgents: []string{"agent4", "agent5"}, expectAgents: append(defaultCacheAgents, "agent4", "agent5")},
}
for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
for k, tt := range testcases {
t.Run(k, func(t *testing.T) {

// add agents
err := m.UpdateCacheAgents(tt.addAgents)
Expand All @@ -99,6 +114,11 @@ func TestUpdateCacheAgents(t *testing.T) {
if !compareAgents(gotAgents, m.ListCacheAgents()) {
t.Errorf("Got agents: %v, cache agents map: %v", gotAgents, m.ListCacheAgents())
}

err = s.Delete(cacheAgentsKey)
if err != nil {
t.Errorf("failed to delete cache agents key, %v", err)
}
})
}
}
Expand Down
142 changes: 92 additions & 50 deletions pkg/yurthub/cachemanager/cache_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ type CacheManager interface {

type cacheManager struct {
sync.RWMutex
storage StorageWrapper
serializerManager *serializer.SerializerManager
cacheAgents map[string]bool
storage StorageWrapper
serializerManager *serializer.SerializerManager
cacheAgents map[string]bool
listSelectorCollector map[string]string
}

// NewCacheManager creates a new CacheManager
Expand All @@ -66,9 +67,10 @@ func NewCacheManager(
serializerMgr *serializer.SerializerManager,
) (CacheManager, error) {
cm := &cacheManager{
storage: storage,
serializerManager: serializerMgr,
cacheAgents: make(map[string]bool),
storage: storage,
serializerManager: serializerMgr,
cacheAgents: make(map[string]bool),
listSelectorCollector: make(map[string]string),
}

err := cm.initCacheAgents()
Expand Down Expand Up @@ -142,17 +144,30 @@ func (cm *cacheManager) queryListObject(req *http.Request) (runtime.Object, erro
return nil, err
}

var gvk schema.GroupVersionKind
var kind string
objs, err := cm.storage.List(key)
if err != nil {
return nil, err
} else if len(objs) == 0 {
return nil, nil
gvk, err = serializer.UnsafeDefaultRESTMapper.KindFor(schema.GroupVersionResource{
Group: info.APIGroup,
Version: info.APIVersion,
Resource: info.Resource,
})
if err != nil {
return nil, err
}
kind = gvk.Kind
} else {
kind = objs[0].GetObjectKind().GroupVersionKind().Kind
}

var listObj runtime.Object
listGvk := schema.GroupVersionKind{
Group: info.APIGroup,
Version: info.APIVersion,
Kind: objs[0].GetObjectKind().GroupVersionKind().Kind + "List",
Kind: kind + "List",
}
if scheme.Scheme.Recognizes(listGvk) {
listObj, err = scheme.Scheme.New(listGvk)
Expand Down Expand Up @@ -304,14 +319,12 @@ func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.Req
}

list, err := serializer.DecodeResp(serializers, b, reqContentType, respContentType)
if err != nil {
if err != nil || list == nil {
klog.Errorf("failed to decode response in saveOneObject %v", err)
return err
}

switch list.(type) {
case *metav1.Status:
// it's not need to cache for status
if _, ok := list.(*metav1.Status); ok {
klog.Infof("it's not need to cache metav1.Status")
return nil
}
Expand All @@ -336,48 +349,48 @@ func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.Req
// in local disk, so when cloud-edge network disconnected,
// yurthub can return empty objects instead of 404 code(not found)
if len(items) == 0 {
// list returns no objects
key, _ := util.KeyFunc(comp, info.Resource, info.Namespace, "")
return cm.storage.Create(key, nil)
}

var errs []error
for i := range items {
name, err := accessor.Name(items[i])
if err != nil || name == "" {
klog.Errorf("failed to get name of list items object, %v", err)
continue
} else if info.Name != "" {
// list with fieldSelector=metadata.name=xxx
if len(items) != 1 {
return fmt.Errorf("%s with fieldSelector=metadata.name=%s, but return more than one objects: %d", util.ReqInfoString(info), info.Name, len(items))
}

ns, err := accessor.Namespace(items[i])
if err != nil {
klog.Errorf("failed to get namespace of list items object, %v", err)
continue
} else if ns == "" {
accessor.SetKind(items[0], kind)
accessor.SetAPIVersion(items[0], apiVersion)
name, _ := accessor.Name(items[0])
ns, _ := accessor.Namespace(items[0])
if ns == "" {
ns = info.Namespace
}

klog.V(5).Infof("path for list item(%d): %s/%s/%s/%s", i, comp, info.Resource, ns, name)
key, err := util.KeyFunc(comp, info.Resource, ns, name)
if err != nil || key == "" {
klog.Errorf("failed to get cache key(%s:%s:%s:%s), %v", comp, info.Resource, ns, name, err)
return err
}

accessor.SetKind(items[i], kind)
accessor.SetAPIVersion(items[i], apiVersion)
err = cm.saveOneObjectWithValidation(key, items[i])
key, _ := util.KeyFunc(comp, info.Resource, ns, name)
err = cm.saveOneObjectWithValidation(key, items[0])
if err == storage.ErrStorageAccessConflict {
klog.V(2).Infof("skip to cache list object because key(%s) is under processing", key)
} else if err != nil {
errs = append(errs, fmt.Errorf("failed to save object(%s), %v", key, err))
return nil
}
}

if len(errs) != 0 {
return fmt.Errorf("failed to save list object, %#+v", errs)
}
return err
} else {
// list all of objects or fieldselector/labelselector
rootKey, _ := util.KeyFunc(comp, info.Resource, info.Namespace, info.Name)
objs := make(map[string]runtime.Object)
for i := range items {
accessor.SetKind(items[i], kind)
accessor.SetAPIVersion(items[i], apiVersion)
name, _ := accessor.Name(items[i])
ns, _ := accessor.Namespace(items[i])
if ns == "" {
ns = info.Namespace
}

return nil
key, _ := util.KeyFunc(comp, info.Resource, ns, name)
objs[key] = items[i]
}

return cm.storage.Replace(rootKey, objs)
}
}

func (cm *cacheManager) saveOneObject(ctx context.Context, info *apirequest.RequestInfo, b []byte) error {
Expand All @@ -397,14 +410,11 @@ func (cm *cacheManager) saveOneObject(ctx context.Context, info *apirequest.Requ
klog.Errorf("failed to decode response in saveOneObject(reqContentType:%s, respContentType:%s): %s, %v", reqContentType, respContentType, util.ReqInfoString(info), err)
return err
} else if obj == nil {
klog.Info("failed to decode nil object. skip cache")
return nil
} else if _, ok := obj.(*metav1.Status); ok {
klog.Infof("it's not need to cache metav1.Status.")
return nil
} else {
switch obj.(type) {
case *metav1.Status:
// it's not need to cache for status
return nil
}
}

var name string
Expand Down Expand Up @@ -535,5 +545,37 @@ func (cm *cacheManager) CanCacheFor(req *http.Request) bool {
return false
}

cm.Lock()
defer cm.Unlock()
if info.Verb == "list" && info.Name == "" {
key, _ := util.KeyFunc(comp, info.Resource, info.Namespace, info.Name)
selector, _ := util.ListSelectorFrom(ctx)
if oldSelector, ok := cm.listSelectorCollector[key]; ok {
if oldSelector != selector {
// list requests that have the same path but with different selector, for example:
// request1: http://{ip:port}/api/v1/default/pods?labelSelector=foo=bar
// request2: http://{ip:port}/api/v1/default/pods?labelSelector=foo2=bar2
// because func queryListObject() will get all pods for both requests instead of
// getting pods by request selector. so cache manager can not support same path list
// requests that has different selector.
return false
}
} else {
// list requests that get the same resources but with different path, for example:
// request1: http://{ip/port}/api/v1/pods?fieldSelector=spec.nodeName=foo
// request2: http://{ip/port}/api/v1/default/pods?fieldSelector=spec.nodeName=foo
// because func queryListObject() will get all pods for both requests instead of
// getting pods by request selector. so cache manager can not support getting same resource
// list requests that has different path.
for k := range cm.listSelectorCollector {
if len(k) > len(key) && strings.Contains(k, key) {
return false
} else if len(k) < len(key) && strings.Contains(key, k) {
return false
}
}
cm.listSelectorCollector[key] = selector
}
}
return true
}
Loading

0 comments on commit 260888e

Please sign in to comment.