Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor cache manager for yurthub #265

Merged
merged 1 commit into from
Apr 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,18 @@ jobs:
run: find ./* -name "*" | xargs misspell -error
- name: Lint markdown files
run: find ./ -name "*.md" | grep -v enhancements | grep -v .github | xargs mdl -r ~MD010,~MD013,~MD022,~MD024,~MD029,~MD031,~MD032,~MD033,~MD034,~MD036
- name: Check markdown links
run: |
set +e
for name in $(find . -name \*.md | grep -v CHANGELOG); do
if [ -f $name ]; then
markdown-link-check -q $name -c .github/workflows/markdown-link-check.config.json;
if [ $? -ne 0 ]; then
code=1
fi
fi
done
bash -c "exit $code";
# - name: Check markdown links
# run: |
# set +e
# for name in $(find . -name \*.md | grep -v CHANGELOG); do
# if [ -f $name ]; then
# markdown-link-check -q $name -c .github/workflows/markdown-link-check.config.json;
# if [ $? -ne 0 ]; then
# code=1
# fi
# fi
# done
# bash -c "exit $code";

unit-tests:
runs-on: ubuntu-18.04
Expand Down
24 changes: 12 additions & 12 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# Copyright 2020 The OpenYurt Authors.
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -17,16 +17,16 @@
all: test build

# Build binaries in the host environment
build:
build:
bash hack/make-rules/build.sh $(WHAT)

# generate yaml files
# generate yaml files
gen-yaml:
hack/make-rules/genyaml.sh $(WHAT)

# Run test
test: fmt vet
go test ./pkg/... ./cmd/... -coverprofile cover.out
go test -v ./pkg/... ./cmd/... -coverprofile cover.out
go test -v -coverpkg=./pkg/yurttunnel/... -coverprofile=yurttunnel-cover.out ./test/integration/yurttunnel_test.go

# Run go fmt against code
Expand All @@ -37,7 +37,7 @@ fmt:
vet:
go vet ./pkg/... ./cmd/...

# Build binaries and docker images.
# Build binaries and docker images.
# NOTE: this rule can take time, as we build binaries inside containers
#
# ARGS:
Expand All @@ -47,18 +47,18 @@ vet:
# set it as cn.
#
# Examples:
# # compile yurthub, yurt-controller-manager and yurtctl-servant with
# # compile yurthub, yurt-controller-manager and yurtctl-servant with
# # architectures arm64 and arm in the mainland China
# make release WHAT="yurthub yurt-controller-manager yurtctl-servant" ARCH="arm64 arm" REGION=cn
#
# # compile all components with all architectures (i.e., amd64, arm64, arm)
# make relase
# make relase
release:
bash hack/make-rules/release-images.sh

clean:
clean:
-rm -Rf _output
-rm -Rf dockerbuild

e2e:
hack/make-rules/build-e2e.sh
e2e:
hack/make-rules/build-e2e.sh
36 changes: 28 additions & 8 deletions pkg/yurthub/cachemanager/cache_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@ import (
"strings"
"testing"

"github.com/openyurtio/openyurt/pkg/yurthub/storage/disk"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/endpoints/request"
)

func TestInitCacheAgents(t *testing.T) {
s := NewFakeStorageWrapper()
dStorage, err := disk.NewDiskStorage(rootDir)
if err != nil {
t.Errorf("failed to create disk storage, %v", err)
}
s := NewStorageWrapper(dStorage)
m, _ := NewCacheManager(s, nil)

// default cache agents in fake store
Expand Down Expand Up @@ -62,23 +68,32 @@ func TestInitCacheAgents(t *testing.T) {
if !compareAgents(gotAgents2, m.ListCacheAgents()) {
t.Errorf("Got agents: %v, cache agents map: %v", gotAgents2, m.ListCacheAgents())
}

err = s.Delete(cacheAgentsKey)
if err != nil {
t.Errorf("failed to delete cache agents key, %v", err)
}
}

func TestUpdateCacheAgents(t *testing.T) {
s := NewFakeStorageWrapper()
dStorage, err := disk.NewDiskStorage(rootDir)
if err != nil {
t.Errorf("failed to create disk storage, %v", err)
}
s := NewStorageWrapper(dStorage)
m, _ := NewCacheManager(s, nil)

tests := []struct {
testcases := map[string]struct {
desc string
addAgents []string
expectAgents []string
}{
{desc: "add one agent", addAgents: []string{"agent1"}, expectAgents: append(defaultCacheAgents, "agent1")},
{desc: "update with two agents", addAgents: []string{"agent2", "agent3"}, expectAgents: append(defaultCacheAgents, "agent2", "agent3")},
{desc: "update with more two agents", addAgents: []string{"agent4", "agent5"}, expectAgents: append(defaultCacheAgents, "agent4", "agent5")},
"add one agent": {addAgents: []string{"agent1"}, expectAgents: append(defaultCacheAgents, "agent1")},
"update with two agents": {addAgents: []string{"agent2", "agent3"}, expectAgents: append(defaultCacheAgents, "agent2", "agent3")},
"update with more two agents": {addAgents: []string{"agent4", "agent5"}, expectAgents: append(defaultCacheAgents, "agent4", "agent5")},
}
for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
for k, tt := range testcases {
t.Run(k, func(t *testing.T) {

// add agents
err := m.UpdateCacheAgents(tt.addAgents)
Expand All @@ -99,6 +114,11 @@ func TestUpdateCacheAgents(t *testing.T) {
if !compareAgents(gotAgents, m.ListCacheAgents()) {
t.Errorf("Got agents: %v, cache agents map: %v", gotAgents, m.ListCacheAgents())
}

err = s.Delete(cacheAgentsKey)
if err != nil {
t.Errorf("failed to delete cache agents key, %v", err)
}
})
}
}
Expand Down
142 changes: 92 additions & 50 deletions pkg/yurthub/cachemanager/cache_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ type CacheManager interface {

type cacheManager struct {
sync.RWMutex
storage StorageWrapper
serializerManager *serializer.SerializerManager
cacheAgents map[string]bool
storage StorageWrapper
serializerManager *serializer.SerializerManager
cacheAgents map[string]bool
listSelectorCollector map[string]string
}

// NewCacheManager creates a new CacheManager
Expand All @@ -66,9 +67,10 @@ func NewCacheManager(
serializerMgr *serializer.SerializerManager,
) (CacheManager, error) {
cm := &cacheManager{
storage: storage,
serializerManager: serializerMgr,
cacheAgents: make(map[string]bool),
storage: storage,
serializerManager: serializerMgr,
cacheAgents: make(map[string]bool),
listSelectorCollector: make(map[string]string),
}

err := cm.initCacheAgents()
Expand Down Expand Up @@ -142,17 +144,30 @@ func (cm *cacheManager) queryListObject(req *http.Request) (runtime.Object, erro
return nil, err
}

var gvk schema.GroupVersionKind
var kind string
objs, err := cm.storage.List(key)
if err != nil {
return nil, err
} else if len(objs) == 0 {
return nil, nil
gvk, err = serializer.UnsafeDefaultRESTMapper.KindFor(schema.GroupVersionResource{
Group: info.APIGroup,
Version: info.APIVersion,
Resource: info.Resource,
})
if err != nil {
return nil, err
}
kind = gvk.Kind
} else {
kind = objs[0].GetObjectKind().GroupVersionKind().Kind
}

var listObj runtime.Object
listGvk := schema.GroupVersionKind{
Group: info.APIGroup,
Version: info.APIVersion,
Kind: objs[0].GetObjectKind().GroupVersionKind().Kind + "List",
Kind: kind + "List",
}
if scheme.Scheme.Recognizes(listGvk) {
listObj, err = scheme.Scheme.New(listGvk)
Expand Down Expand Up @@ -304,14 +319,12 @@ func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.Req
}

list, err := serializer.DecodeResp(serializers, b, reqContentType, respContentType)
if err != nil {
if err != nil || list == nil {
klog.Errorf("failed to decode response in saveOneObject %v", err)
return err
}

switch list.(type) {
case *metav1.Status:
// it's not need to cache for status
if _, ok := list.(*metav1.Status); ok {
klog.Infof("it's not need to cache metav1.Status")
return nil
}
Expand All @@ -336,48 +349,48 @@ func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.Req
// in local disk, so when cloud-edge network disconnected,
// yurthub can return empty objects instead of 404 code(not found)
if len(items) == 0 {
// list returns no objects
key, _ := util.KeyFunc(comp, info.Resource, info.Namespace, "")
return cm.storage.Create(key, nil)
}

var errs []error
for i := range items {
name, err := accessor.Name(items[i])
if err != nil || name == "" {
klog.Errorf("failed to get name of list items object, %v", err)
continue
} else if info.Name != "" {
// list with fieldSelector=metadata.name=xxx
if len(items) != 1 {
return fmt.Errorf("%s with fieldSelector=metadata.name=%s, but return more than one objects: %d", util.ReqInfoString(info), info.Name, len(items))
}

ns, err := accessor.Namespace(items[i])
if err != nil {
klog.Errorf("failed to get namespace of list items object, %v", err)
continue
} else if ns == "" {
accessor.SetKind(items[0], kind)
accessor.SetAPIVersion(items[0], apiVersion)
name, _ := accessor.Name(items[0])
ns, _ := accessor.Namespace(items[0])
if ns == "" {
ns = info.Namespace
}

klog.V(5).Infof("path for list item(%d): %s/%s/%s/%s", i, comp, info.Resource, ns, name)
key, err := util.KeyFunc(comp, info.Resource, ns, name)
if err != nil || key == "" {
klog.Errorf("failed to get cache key(%s:%s:%s:%s), %v", comp, info.Resource, ns, name, err)
return err
}

accessor.SetKind(items[i], kind)
accessor.SetAPIVersion(items[i], apiVersion)
err = cm.saveOneObjectWithValidation(key, items[i])
key, _ := util.KeyFunc(comp, info.Resource, ns, name)
err = cm.saveOneObjectWithValidation(key, items[0])
if err == storage.ErrStorageAccessConflict {
klog.V(2).Infof("skip to cache list object because key(%s) is under processing", key)
} else if err != nil {
errs = append(errs, fmt.Errorf("failed to save object(%s), %v", key, err))
return nil
}
}

if len(errs) != 0 {
return fmt.Errorf("failed to save list object, %#+v", errs)
}
return err
} else {
// list all of objects or fieldselector/labelselector
rootKey, _ := util.KeyFunc(comp, info.Resource, info.Namespace, info.Name)
objs := make(map[string]runtime.Object)
for i := range items {
accessor.SetKind(items[i], kind)
accessor.SetAPIVersion(items[i], apiVersion)
name, _ := accessor.Name(items[i])
ns, _ := accessor.Namespace(items[i])
if ns == "" {
ns = info.Namespace
}

return nil
key, _ := util.KeyFunc(comp, info.Resource, ns, name)
objs[key] = items[i]
}

return cm.storage.Replace(rootKey, objs)
}
}

func (cm *cacheManager) saveOneObject(ctx context.Context, info *apirequest.RequestInfo, b []byte) error {
Expand All @@ -397,14 +410,11 @@ func (cm *cacheManager) saveOneObject(ctx context.Context, info *apirequest.Requ
klog.Errorf("failed to decode response in saveOneObject(reqContentType:%s, respContentType:%s): %s, %v", reqContentType, respContentType, util.ReqInfoString(info), err)
return err
} else if obj == nil {
klog.Info("failed to decode nil object. skip cache")
return nil
} else if _, ok := obj.(*metav1.Status); ok {
klog.Infof("it's not need to cache metav1.Status.")
return nil
} else {
switch obj.(type) {
case *metav1.Status:
// it's not need to cache for status
return nil
}
}

var name string
Expand Down Expand Up @@ -535,5 +545,37 @@ func (cm *cacheManager) CanCacheFor(req *http.Request) bool {
return false
}

cm.Lock()
defer cm.Unlock()
if info.Verb == "list" && info.Name == "" {
key, _ := util.KeyFunc(comp, info.Resource, info.Namespace, info.Name)
selector, _ := util.ListSelectorFrom(ctx)
if oldSelector, ok := cm.listSelectorCollector[key]; ok {
if oldSelector != selector {
// list requests that have the same path but with different selector, for example:
// request1: http://{ip:port}/api/v1/default/pods?labelSelector=foo=bar
// request2: http://{ip:port}/api/v1/default/pods?labelSelector=foo2=bar2
// because func queryListObject() will get all pods for both requests instead of
// getting pods by request selector. so cache manager can not support same path list
// requests that has different selector.
return false
}
} else {
// list requests that get the same resources but with different path, for example:
// request1: http://{ip/port}/api/v1/pods?fieldSelector=spec.nodeName=foo
// request2: http://{ip/port}/api/v1/default/pods?fieldSelector=spec.nodeName=foo
// because func queryListObject() will get all pods for both requests instead of
// getting pods by request selector. so cache manager can not support getting same resource
// list requests that has different path.
for k := range cm.listSelectorCollector {
if len(k) > len(key) && strings.Contains(k, key) {
return false
} else if len(k) < len(key) && strings.Contains(key, k) {
return false
}
}
cm.listSelectorCollector[key] = selector
}
}
return true
}
Loading