From e3e25f27801eb55ff7e513828e8a297d7f2b5e58 Mon Sep 17 00:00:00 2001 From: kkf1 <2644863636@qq.com> Date: Mon, 27 Mar 2023 17:59:43 +0800 Subject: [PATCH] [fix] fix inconsistency bug between cache layer and etcd. --- examples/dev/kie-conf.yaml | 8 +- server/config/struct.go | 11 +- server/datasource/etcd/kv/kv_cache.go | 150 ++++++++++++--------- server/datasource/etcd/kv/kv_cache_test.go | 5 + 4 files changed, 106 insertions(+), 68 deletions(-) diff --git a/examples/dev/kie-conf.yaml b/examples/dev/kie-conf.yaml index 11b91067..d60c9a20 100644 --- a/examples/dev/kie-conf.yaml +++ b/examples/dev/kie-conf.yaml @@ -18,4 +18,10 @@ db: # rsaPublicKeyFile: ./examples/dev/public.key sync: # turn on the synchronization switch related operations will be written to the task in the db - enabled: false \ No newline at end of file + enabled: false +#cacheLabels: +# labelsArray: +# - environment +# - service +# - app +# - version diff --git a/server/config/struct.go b/server/config/struct.go index 83e91f08..4a55c2ee 100644 --- a/server/config/struct.go +++ b/server/config/struct.go @@ -19,9 +19,10 @@ package config // Config is yaml file struct type Config struct { - DB DB `yaml:"db"` - RBAC RBAC `yaml:"rbac"` - Sync Sync `yaml:"sync"` + DB DB `yaml:"db"` + RBAC RBAC `yaml:"rbac"` + Sync Sync `yaml:"sync"` + CacheLabels CacheLabels `yaml:"cacheLabels"` //config from cli ConfigFile string NodeName string @@ -59,3 +60,7 @@ type RBAC struct { type Sync struct { Enabled bool `yaml:"enabled"` } + +type CacheLabels struct { + LabelsArray []string `yaml:"labelsArray"` +} diff --git a/server/datasource/etcd/kv/kv_cache.go b/server/datasource/etcd/kv/kv_cache.go index 5776bf73..88bcc80e 100644 --- a/server/datasource/etcd/kv/kv_cache.go +++ b/server/datasource/etcd/kv/kv_cache.go @@ -11,6 +11,7 @@ import ( "github.com/apache/servicecomb-kie/pkg/model" "github.com/apache/servicecomb-kie/pkg/stringutil" + "github.com/apache/servicecomb-kie/server/config" "github.com/apache/servicecomb-kie/server/datasource" "github.com/apache/servicecomb-kie/server/datasource/etcd/key" "github.com/go-chassis/foundation/backoff" @@ -37,21 +38,27 @@ const ( type IDSet map[string]struct{} -type Cache struct { - timeOut time.Duration - client etcdadpt.Client - revision int64 - kvIDCache sync.Map - kvDocCache *goCache.Cache +type LabelsSet map[string]struct{} + +type CacheSearchReq struct { + Domain string + Project string + Opts *datasource.FindOptions + Regex *regexp.Regexp } func NewKvCache() *Cache { kvDocCache := goCache.New(cacheExpirationTime, cacheCleanupInterval) + labelsSet := LabelsSet{} + for _, label := range config.Configurations.CacheLabels.LabelsArray { + labelsSet[label] = struct{}{} + } return &Cache{ timeOut: etcdWatchTimeout, client: etcdadpt.Instance(), revision: 0, kvDocCache: kvDocCache, + labelsSet: labelsSet, } } @@ -59,11 +66,13 @@ func Enabled() bool { return kvCache != nil } -type CacheSearchReq struct { - Domain string - Project string - Opts *datasource.FindOptions - Regex *regexp.Regexp +type Cache struct { + timeOut time.Duration + client etcdadpt.Client + revision int64 + kvIDCache sync.Map + kvDocCache *goCache.Cache + labelsSet LabelsSet } func (kc *Cache) Refresh(ctx context.Context) { @@ -154,6 +163,9 @@ func (kc *Cache) cachePut(rsp *etcdadpt.Response) { openlog.Error(fmt.Sprintf("failed to unmarshal kv, err %v", err)) continue } + if !kc.isInLabelsSet(kvDoc.Labels) { + continue + } kc.StoreKvDoc(kvDoc.ID, kvDoc) cacheKey := kc.GetCacheKey(kvDoc.Domain, kvDoc.Project, kvDoc.Labels) m, ok := kc.LoadKvIDSet(cacheKey) @@ -220,46 +232,6 @@ func (kc *Cache) DeleteKvDoc(kvID string) { kc.kvDocCache.Delete(kvID) } -func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool, error) { - if !req.Opts.ExactLabels { - return nil, false, nil - } - - openlog.Debug(fmt.Sprintf("using cache to search kv, domain %v, project %v, opts %+v", req.Domain, req.Project, *req.Opts)) - result := &model.KVResponse{ - Data: []*model.KVDoc{}, - } - cacheKey := kvCache.GetCacheKey(req.Domain, req.Project, req.Opts.Labels) - kvIds, ok := kvCache.LoadKvIDSet(cacheKey) - if !ok { - kvCache.StoreKvIDSet(cacheKey, IDSet{}) - return result, true, nil - } - - var docs []*model.KVDoc - - var kvIdsLeft []string - for kvID := range kvIds { - if doc, ok := kvCache.LoadKvDoc(kvID); ok { - docs = append(docs, doc) - continue - } - kvIdsLeft = append(kvIdsLeft, kvID) - } - - tpData := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft) - docs = append(docs, tpData...) - - for _, doc := range docs { - if isMatch(req, doc) { - datasource.ClearPart(doc) - result.Data = append(result.Data, doc) - } - } - result.Total = len(result.Data) - return result, true, nil -} - func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLeft []string) []*model.KVDoc { if len(kvIdsLeft) == 0 { return nil @@ -294,19 +266,6 @@ func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLe return docs } -func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool { - if doc == nil { - return false - } - if req.Opts.Status != "" && doc.Status != req.Opts.Status { - return false - } - if req.Regex != nil && !req.Regex.MatchString(doc.Key) { - return false - } - return true -} - func (kc *Cache) GetKvDoc(kv *mvccpb.KeyValue) (*model.KVDoc, error) { kvDoc := &model.KVDoc{} err := json.Unmarshal(kv.Value, kvDoc) @@ -326,3 +285,66 @@ func (kc *Cache) GetCacheKey(domain, project string, labels map[string]string) s }, "/") return inputKey } + +func (kc *Cache) isInLabelsSet(Labels map[string]string) bool { + for label := range Labels { + if _, ok := kc.labelsSet[label]; !ok { + return false + } + } + return true +} + +func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool, error) { + result := &model.KVResponse{ + Data: []*model.KVDoc{}, + } + if !req.Opts.ExactLabels || !kvCache.isInLabelsSet(req.Opts.Labels) { + return result, false, nil + } + + openlog.Debug(fmt.Sprintf("using cache to search kv, domain %v, project %v, opts %+v", req.Domain, req.Project, *req.Opts)) + cacheKey := kvCache.GetCacheKey(req.Domain, req.Project, req.Opts.Labels) + + kvIds, ok := kvCache.LoadKvIDSet(cacheKey) + if !ok { + kvCache.StoreKvIDSet(cacheKey, IDSet{}) + return result, true, nil + } + + var docs []*model.KVDoc + + var kvIdsLeft []string + for kvID := range kvIds { + if doc, ok := kvCache.LoadKvDoc(kvID); ok { + docs = append(docs, doc) + continue + } + kvIdsLeft = append(kvIdsLeft, kvID) + } + + tpData := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft) + docs = append(docs, tpData...) + + for _, doc := range docs { + if isMatch(req, doc) { + datasource.ClearPart(doc) + result.Data = append(result.Data, doc) + } + } + result.Total = len(result.Data) + return result, true, nil +} + +func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool { + if doc == nil { + return false + } + if req.Opts.Status != "" && doc.Status != req.Opts.Status { + return false + } + if req.Regex != nil && !req.Regex.MatchString(doc.Key) { + return false + } + return true +} diff --git a/server/datasource/etcd/kv/kv_cache_test.go b/server/datasource/etcd/kv/kv_cache_test.go index 22866996..ef72eb6f 100644 --- a/server/datasource/etcd/kv/kv_cache_test.go +++ b/server/datasource/etcd/kv/kv_cache_test.go @@ -3,11 +3,16 @@ package kv import ( "testing" + "github.com/apache/servicecomb-kie/server/config" "github.com/little-cui/etcdadpt" "github.com/stretchr/testify/assert" "go.etcd.io/etcd/api/v3/mvccpb" ) +func init() { + config.Configurations.CacheLabels.LabelsArray = []string{"environment", "service", "app", "version"} +} + type args struct { rsp *etcdadpt.Response }