From e005d80afeab09163289feac644dfd6a2f12fa41 Mon Sep 17 00:00:00 2001 From: kkf1 <46839758+kkf1@users.noreply.github.com> Date: Wed, 29 Mar 2023 11:10:31 +0800 Subject: [PATCH] [fix] fix inconsistency bug between cache layer and etcd. (#287) * [fix] fix inconsistency bug between cache layer and etcd. * [fix] modify review comments. * [fix] add ut for kv_cache.go. --- examples/dev/kie-conf.yaml | 8 +- server/config/struct.go | 11 +- server/datasource/etcd/kv/kv_cache.go | 152 +++++++----- server/datasource/etcd/kv/kv_cache_test.go | 270 +++++++++++++++++++-- 4 files changed, 346 insertions(+), 95 deletions(-) diff --git a/examples/dev/kie-conf.yaml b/examples/dev/kie-conf.yaml index 11b91067..d4aabb1e 100644 --- a/examples/dev/kie-conf.yaml +++ b/examples/dev/kie-conf.yaml @@ -18,4 +18,10 @@ db: # rsaPublicKeyFile: ./examples/dev/public.key sync: # turn on the synchronization switch related operations will be written to the task in the db - enabled: false \ No newline at end of file + enabled: false +#cache: +# labels: +# - environment +# - service +# - app +# - version diff --git a/server/config/struct.go b/server/config/struct.go index 83e91f08..44669def 100644 --- a/server/config/struct.go +++ b/server/config/struct.go @@ -19,9 +19,10 @@ package config // Config is yaml file struct type Config struct { - DB DB `yaml:"db"` - RBAC RBAC `yaml:"rbac"` - Sync Sync `yaml:"sync"` + DB DB `yaml:"db"` + RBAC RBAC `yaml:"rbac"` + Sync Sync `yaml:"sync"` + Cache Cache `yaml:"cache"` //config from cli ConfigFile string NodeName string @@ -59,3 +60,7 @@ type RBAC struct { type Sync struct { Enabled bool `yaml:"enabled"` } + +type Cache struct { + Labels []string `yaml:"labels"` +} diff --git a/server/datasource/etcd/kv/kv_cache.go b/server/datasource/etcd/kv/kv_cache.go index 5776bf73..2930a99d 100644 --- a/server/datasource/etcd/kv/kv_cache.go +++ b/server/datasource/etcd/kv/kv_cache.go @@ -11,6 +11,7 @@ import ( "github.com/apache/servicecomb-kie/pkg/model" "github.com/apache/servicecomb-kie/pkg/stringutil" + "github.com/apache/servicecomb-kie/server/config" "github.com/apache/servicecomb-kie/server/datasource" "github.com/apache/servicecomb-kie/server/datasource/etcd/key" "github.com/go-chassis/foundation/backoff" @@ -37,21 +38,27 @@ const ( type IDSet map[string]struct{} -type Cache struct { - timeOut time.Duration - client etcdadpt.Client - revision int64 - kvIDCache sync.Map - kvDocCache *goCache.Cache +type LabelsSet map[string]struct{} + +type CacheSearchReq struct { + Domain string + Project string + Opts *datasource.FindOptions + Regex *regexp.Regexp } func NewKvCache() *Cache { kvDocCache := goCache.New(cacheExpirationTime, cacheCleanupInterval) + labelsSet := LabelsSet{} + for _, label := range config.Configurations.Cache.Labels { + labelsSet[label] = struct{}{} + } return &Cache{ timeOut: etcdWatchTimeout, client: etcdadpt.Instance(), revision: 0, kvDocCache: kvDocCache, + labelsSet: labelsSet, } } @@ -59,11 +66,13 @@ func Enabled() bool { return kvCache != nil } -type CacheSearchReq struct { - Domain string - Project string - Opts *datasource.FindOptions - Regex *regexp.Regexp +type Cache struct { + timeOut time.Duration + client etcdadpt.Client + revision int64 + kvIDCache sync.Map + kvDocCache *goCache.Cache + labelsSet LabelsSet } func (kc *Cache) Refresh(ctx context.Context) { @@ -130,7 +139,7 @@ func (kc *Cache) list(ctx context.Context) (*etcdadpt.Response, error) { return rsp, nil } -func (kc *Cache) watchCallBack(message string, rsp *etcdadpt.Response) error { +func (kc *Cache) watchCallBack(_ string, rsp *etcdadpt.Response) error { if rsp == nil || len(rsp.Kvs) == 0 { return fmt.Errorf("unknown event") } @@ -154,6 +163,9 @@ func (kc *Cache) cachePut(rsp *etcdadpt.Response) { openlog.Error(fmt.Sprintf("failed to unmarshal kv, err %v", err)) continue } + if !kc.isInLabelsSet(kvDoc.Labels) { + continue + } kc.StoreKvDoc(kvDoc.ID, kvDoc) cacheKey := kc.GetCacheKey(kvDoc.Domain, kvDoc.Project, kvDoc.Labels) m, ok := kc.LoadKvIDSet(cacheKey) @@ -220,46 +232,6 @@ func (kc *Cache) DeleteKvDoc(kvID string) { kc.kvDocCache.Delete(kvID) } -func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool, error) { - if !req.Opts.ExactLabels { - return nil, false, nil - } - - openlog.Debug(fmt.Sprintf("using cache to search kv, domain %v, project %v, opts %+v", req.Domain, req.Project, *req.Opts)) - result := &model.KVResponse{ - Data: []*model.KVDoc{}, - } - cacheKey := kvCache.GetCacheKey(req.Domain, req.Project, req.Opts.Labels) - kvIds, ok := kvCache.LoadKvIDSet(cacheKey) - if !ok { - kvCache.StoreKvIDSet(cacheKey, IDSet{}) - return result, true, nil - } - - var docs []*model.KVDoc - - var kvIdsLeft []string - for kvID := range kvIds { - if doc, ok := kvCache.LoadKvDoc(kvID); ok { - docs = append(docs, doc) - continue - } - kvIdsLeft = append(kvIdsLeft, kvID) - } - - tpData := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft) - docs = append(docs, tpData...) - - for _, doc := range docs { - if isMatch(req, doc) { - datasource.ClearPart(doc) - result.Data = append(result.Data, doc) - } - } - result.Total = len(result.Data) - return result, true, nil -} - func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLeft []string) []*model.KVDoc { if len(kvIdsLeft) == 0 { return nil @@ -294,19 +266,6 @@ func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLe return docs } -func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool { - if doc == nil { - return false - } - if req.Opts.Status != "" && doc.Status != req.Opts.Status { - return false - } - if req.Regex != nil && !req.Regex.MatchString(doc.Key) { - return false - } - return true -} - func (kc *Cache) GetKvDoc(kv *mvccpb.KeyValue) (*model.KVDoc, error) { kvDoc := &model.KVDoc{} err := json.Unmarshal(kv.Value, kvDoc) @@ -326,3 +285,66 @@ func (kc *Cache) GetCacheKey(domain, project string, labels map[string]string) s }, "/") return inputKey } + +func (kc *Cache) isInLabelsSet(Labels map[string]string) bool { + for label := range Labels { + if _, ok := kc.labelsSet[label]; !ok { + return false + } + } + return true +} + +func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool, error) { + result := &model.KVResponse{ + Data: []*model.KVDoc{}, + } + if !req.Opts.ExactLabels || !kvCache.isInLabelsSet(req.Opts.Labels) { + return result, false, nil + } + + openlog.Debug(fmt.Sprintf("using cache to search kv, domain %v, project %v, opts %+v", req.Domain, req.Project, *req.Opts)) + cacheKey := kvCache.GetCacheKey(req.Domain, req.Project, req.Opts.Labels) + + kvIds, ok := kvCache.LoadKvIDSet(cacheKey) + if !ok { + kvCache.StoreKvIDSet(cacheKey, IDSet{}) + return result, true, nil + } + + var docs []*model.KVDoc + + var kvIdsLeft []string + for kvID := range kvIds { + if doc, ok := kvCache.LoadKvDoc(kvID); ok { + docs = append(docs, doc) + continue + } + kvIdsLeft = append(kvIdsLeft, kvID) + } + + tpData := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft) + docs = append(docs, tpData...) + + for _, doc := range docs { + if isMatch(req, doc) { + datasource.ClearPart(doc) + result.Data = append(result.Data, doc) + } + } + result.Total = len(result.Data) + return result, true, nil +} + +func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool { + if doc == nil { + return false + } + if req.Opts.Status != "" && doc.Status != req.Opts.Status { + return false + } + if req.Regex != nil && !req.Regex.MatchString(doc.Key) { + return false + } + return true +} diff --git a/server/datasource/etcd/kv/kv_cache_test.go b/server/datasource/etcd/kv/kv_cache_test.go index 22866996..c128b5ac 100644 --- a/server/datasource/etcd/kv/kv_cache_test.go +++ b/server/datasource/etcd/kv/kv_cache_test.go @@ -1,46 +1,67 @@ package kv import ( + "fmt" + "reflect" "testing" + "time" + "github.com/apache/servicecomb-kie/pkg/model" + "github.com/apache/servicecomb-kie/server/config" "github.com/little-cui/etcdadpt" + goCache "github.com/patrickmn/go-cache" "github.com/stretchr/testify/assert" "go.etcd.io/etcd/api/v3/mvccpb" ) -type args struct { - rsp *etcdadpt.Response +func init() { + config.Configurations.Cache.Labels = []string{"environment"} } func TestCachePut(t *testing.T) { + type args struct { + rsp *etcdadpt.Response + } tests := []struct { name string args args want int }{ - {"put 0 kvDoc, cache should store 0 kvDoc", - args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{}}}, - 0, + { + name: "put 0 kvDoc, cache should store 0 kvDoc", + args: args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{}}}, + want: 0, }, - {"put 1 kvDoc, cache should store 1 kvDoc", - args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{ + { + name: "put 1 kvDoc, cache should store 1 kvDoc", + args: args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{ {Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)}, }}}, - 1, + want: 1, }, - {"put 2 kvDocs with different kvIds, cache should store 2 kvDocs", - args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{ + { + name: "put 2 kvDocs with different kvIds, cache should store 2 kvDocs", + args: args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{ {Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)}, {Value: []byte(`{"id":"2", "key":"withToys", "value":"yes", "labels":{"environment":"testing"}}`)}, }}}, - 2, + want: 2, }, - {"put 2 kvDocs with same kvId, cache should store 1 kvDocs", - args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{ + { + name: "put 2 kvDocs with same kvId, cache should store 1 kvDocs", + args: args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{ {Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)}, {Value: []byte(`{"id":"1", "key":"withToys", "value":"yes", "labels":{"environment":"testing"}}`)}, }}}, - 1, + want: 1, + }, + { + name: "put 2 kvDoc, but labels are not cached, cache should store 0 kvDoc", + args: args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{ + {Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"env":"testing"}}`)}, + {Value: []byte(`{"id":"1", "key":"withToys", "value":"yes", "labels":{"env":"testing"}}`)}, + }}}, + want: 0, }, } for _, tt := range tests { @@ -54,33 +75,40 @@ func TestCachePut(t *testing.T) { } func TestCacheDelete(t *testing.T) { + type args struct { + rsp *etcdadpt.Response + } tests := []struct { name string args args want int }{ - {"first put 2 kvDocs, then delete 0 kvDoc, cache should store 2 kvDocs", - args{&etcdadpt.Response{Action: etcdadpt.ActionDelete, Kvs: []*mvccpb.KeyValue{}}}, - 2, + { + name: "first put 2 kvDocs, then delete 0 kvDoc, cache should store 2 kvDocs", + args: args{&etcdadpt.Response{Action: etcdadpt.ActionDelete, Kvs: []*mvccpb.KeyValue{}}}, + want: 2, }, - {"first put 2 kvDocs, then delete kvId=1, cache should store 1 kvDocs", - args{&etcdadpt.Response{Action: etcdadpt.ActionDelete, Kvs: []*mvccpb.KeyValue{ + { + name: "first put 2 kvDocs, then delete kvId=1, cache should store 1 kvDocs", + args: args{&etcdadpt.Response{Action: etcdadpt.ActionDelete, Kvs: []*mvccpb.KeyValue{ {Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)}, }}}, - 1, + want: 1, }, - {"first put 2 kvDocs, then delete kvId=1 and kvId=2, cache should store 0 kvDocs", - args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{ + { + name: "first put 2 kvDocs, then delete kvId=1 and kvId=2, cache should store 0 kvDocs", + args: args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{ {Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)}, {Value: []byte(`{"id":"2", "key":"withToys", "value":"yes", "labels":{"environment":"testing"}}`)}, }}}, - 0, + want: 0, }, - {"first put 2 kvDocs, then delete non-exist kvId=0, cache should store 2 kvDocs", - args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{ + { + name: "first put 2 kvDocs, then delete non-exist kvId=0, cache should store 2 kvDocs", + args: args{&etcdadpt.Response{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{ {Value: []byte(`{"id":"0", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)}, }}}, - 2, + want: 2, }, } for _, tt := range tests { @@ -96,3 +124,193 @@ func TestCacheDelete(t *testing.T) { }) } } + +func TestWatchCallBack(t *testing.T) { + type args struct { + rsp []*etcdadpt.Response + } + type want struct { + kvNum int + err error + } + tests := []struct { + name string + args args + want want + }{ + { + name: "receive 2 messages without kvs, expected: error is not nil, cache should store 0 kvDoc", + args: args{ + rsp: []*etcdadpt.Response{ + { + Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{}, + }, + + { + Action: etcdadpt.ActionDelete, Kvs: []*mvccpb.KeyValue{}, + }, + }, + }, + want: want{ + kvNum: 0, + err: fmt.Errorf("unknown event"), + }, + }, + { + name: "receive 1 put message, put 0 kvDoc, expected: error is not nil, cache should store 0 kvDoc", + args: args{ + rsp: []*etcdadpt.Response{ + { + Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{}, + }, + }, + }, + want: want{ + kvNum: 0, + err: fmt.Errorf("unknown event"), + }, + }, + { + name: "receive 1 delete message, delete 0 kvDoc, expected: error is not nil, cache should store 0 kvDoc", + args: args{ + rsp: []*etcdadpt.Response{{Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{}}}, + }, + want: want{ + kvNum: 0, + err: fmt.Errorf("unknown event"), + }, + }, + { + name: "receive put message, put 1 kvDocs, expected: error is nil, cache should store 1 kvDocs", + args: args{ + rsp: []*etcdadpt.Response{ + { + Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{{Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)}}}, + }, + }, + want: want{ + kvNum: 1, + err: nil, + }, + }, + { + name: "receive 1 put message, 1 delete message, first put 1 kvDoc, then delete it, expected: error is nil, cache should store 0 kvDoc", + args: args{ + rsp: []*etcdadpt.Response{ + { + Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{{Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)}}, + }, + { + Action: etcdadpt.ActionDelete, Kvs: []*mvccpb.KeyValue{{Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"environment":"testing"}}`)}}, + }, + }, + }, + want: want{ + kvNum: 0, + err: nil, + }, + }, + { + name: "receive put message put 1 kvDoc, but labels are not cached, cache should store 0 kvDoc", + args: args{ + []*etcdadpt.Response{ + { + Action: etcdadpt.ActionPut, Kvs: []*mvccpb.KeyValue{{Value: []byte(`{"id":"1", "key":"withFruit", "value":"no", "labels":{"env":"testing"}}`)}}, + }, + }, + }, + want: want{ + kvNum: 0, + err: nil, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + kc := NewKvCache() + for _, rsp := range tt.args.rsp { + err := kc.watchCallBack("", rsp) + assert.Equal(t, tt.want.err, err) + } + num := kc.kvDocCache.ItemCount() + assert.Equal(t, tt.want.kvNum, num) + }) + } +} + +func TestStoreAndLoadKvDoc(t *testing.T) { + type want struct { + kvDoc *model.KVDoc + exist bool + } + type args struct { + kvID string + kvDoc *model.KVDoc + expireTime time.Duration + waitTimeAfterStore time.Duration + } + tests := []struct { + name string + args args + want want + }{ + { + name: "store 1 kv and the expire time is 1 seconds, then load the kv with no wait time, expect: load kv successfully", + args: args{ + kvID: "", + kvDoc: &model.KVDoc{ + ID: "1", + Key: "withFood", + Value: "yes", + Labels: map[string]string{ + "environment": "testing", + }, + }, + expireTime: 1 * time.Second, + waitTimeAfterStore: 0, + }, + want: want{ + kvDoc: &model.KVDoc{ + ID: "1", + Key: "withFood", + Value: "yes", + Labels: map[string]string{ + "environment": "testing", + }, + }, + exist: true, + }, + }, + { + name: "store 1 kv and the expire time is 1 seconds, after waiting 2 seconds, then load the kv, expect: unable to load the kv", + args: args{ + kvID: "", + kvDoc: &model.KVDoc{ + ID: "1", + Key: "withFood", + Value: "yes", + Labels: map[string]string{ + "environment": "testing", + }, + }, + expireTime: 1 * time.Second, + waitTimeAfterStore: 2 * time.Second, + }, + want: want{ + kvDoc: nil, + exist: false, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + kc := NewKvCache() + kc.kvDocCache = goCache.New(tt.args.expireTime, tt.args.expireTime) + kc.StoreKvDoc(tt.args.kvID, tt.args.kvDoc) + time.Sleep(tt.args.waitTimeAfterStore) + doc, exist := kc.LoadKvDoc(tt.args.kvID) + assert.Equal(t, tt.want.exist, exist) + reflect.DeepEqual(tt.want.kvDoc, doc) + }) + } +}