Skip to content

Commit

Permalink
[fix] fix inconsistency bug between cache layer and etcd. (apache#287)
Browse files Browse the repository at this point in the history
* [fix] fix inconsistency bug between cache layer and etcd.

* [fix] modify review comments.

* [fix] add ut for kv_cache.go.
  • Loading branch information
kkf1 authored Mar 29, 2023
1 parent a105b22 commit e005d80
Show file tree
Hide file tree
Showing 4 changed files with 346 additions and 95 deletions.
8 changes: 7 additions & 1 deletion examples/dev/kie-conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,10 @@ db:
# rsaPublicKeyFile: ./examples/dev/public.key
sync:
# turn on the synchronization switch related operations will be written to the task in the db
enabled: false
enabled: false
#cache:
# labels:
# - environment
# - service
# - app
# - version
11 changes: 8 additions & 3 deletions server/config/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package config

// Config is yaml file struct
type Config struct {
DB DB `yaml:"db"`
RBAC RBAC `yaml:"rbac"`
Sync Sync `yaml:"sync"`
DB DB `yaml:"db"`
RBAC RBAC `yaml:"rbac"`
Sync Sync `yaml:"sync"`
Cache Cache `yaml:"cache"`
//config from cli
ConfigFile string
NodeName string
Expand Down Expand Up @@ -59,3 +60,7 @@ type RBAC struct {
type Sync struct {
Enabled bool `yaml:"enabled"`
}

type Cache struct {
Labels []string `yaml:"labels"`
}
152 changes: 87 additions & 65 deletions server/datasource/etcd/kv/kv_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/pkg/stringutil"
"github.com/apache/servicecomb-kie/server/config"
"github.com/apache/servicecomb-kie/server/datasource"
"github.com/apache/servicecomb-kie/server/datasource/etcd/key"
"github.com/go-chassis/foundation/backoff"
Expand All @@ -37,33 +38,41 @@ const (

type IDSet map[string]struct{}

type Cache struct {
timeOut time.Duration
client etcdadpt.Client
revision int64
kvIDCache sync.Map
kvDocCache *goCache.Cache
type LabelsSet map[string]struct{}

type CacheSearchReq struct {
Domain string
Project string
Opts *datasource.FindOptions
Regex *regexp.Regexp
}

func NewKvCache() *Cache {
kvDocCache := goCache.New(cacheExpirationTime, cacheCleanupInterval)
labelsSet := LabelsSet{}
for _, label := range config.Configurations.Cache.Labels {
labelsSet[label] = struct{}{}
}
return &Cache{
timeOut: etcdWatchTimeout,
client: etcdadpt.Instance(),
revision: 0,
kvDocCache: kvDocCache,
labelsSet: labelsSet,
}
}

func Enabled() bool {
return kvCache != nil
}

type CacheSearchReq struct {
Domain string
Project string
Opts *datasource.FindOptions
Regex *regexp.Regexp
type Cache struct {
timeOut time.Duration
client etcdadpt.Client
revision int64
kvIDCache sync.Map
kvDocCache *goCache.Cache
labelsSet LabelsSet
}

func (kc *Cache) Refresh(ctx context.Context) {
Expand Down Expand Up @@ -130,7 +139,7 @@ func (kc *Cache) list(ctx context.Context) (*etcdadpt.Response, error) {
return rsp, nil
}

func (kc *Cache) watchCallBack(message string, rsp *etcdadpt.Response) error {
func (kc *Cache) watchCallBack(_ string, rsp *etcdadpt.Response) error {
if rsp == nil || len(rsp.Kvs) == 0 {
return fmt.Errorf("unknown event")
}
Expand All @@ -154,6 +163,9 @@ func (kc *Cache) cachePut(rsp *etcdadpt.Response) {
openlog.Error(fmt.Sprintf("failed to unmarshal kv, err %v", err))
continue
}
if !kc.isInLabelsSet(kvDoc.Labels) {
continue
}
kc.StoreKvDoc(kvDoc.ID, kvDoc)
cacheKey := kc.GetCacheKey(kvDoc.Domain, kvDoc.Project, kvDoc.Labels)
m, ok := kc.LoadKvIDSet(cacheKey)
Expand Down Expand Up @@ -220,46 +232,6 @@ func (kc *Cache) DeleteKvDoc(kvID string) {
kc.kvDocCache.Delete(kvID)
}

func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool, error) {
if !req.Opts.ExactLabels {
return nil, false, nil
}

openlog.Debug(fmt.Sprintf("using cache to search kv, domain %v, project %v, opts %+v", req.Domain, req.Project, *req.Opts))
result := &model.KVResponse{
Data: []*model.KVDoc{},
}
cacheKey := kvCache.GetCacheKey(req.Domain, req.Project, req.Opts.Labels)
kvIds, ok := kvCache.LoadKvIDSet(cacheKey)
if !ok {
kvCache.StoreKvIDSet(cacheKey, IDSet{})
return result, true, nil
}

var docs []*model.KVDoc

var kvIdsLeft []string
for kvID := range kvIds {
if doc, ok := kvCache.LoadKvDoc(kvID); ok {
docs = append(docs, doc)
continue
}
kvIdsLeft = append(kvIdsLeft, kvID)
}

tpData := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft)
docs = append(docs, tpData...)

for _, doc := range docs {
if isMatch(req, doc) {
datasource.ClearPart(doc)
result.Data = append(result.Data, doc)
}
}
result.Total = len(result.Data)
return result, true, nil
}

func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLeft []string) []*model.KVDoc {
if len(kvIdsLeft) == 0 {
return nil
Expand Down Expand Up @@ -294,19 +266,6 @@ func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLe
return docs
}

func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool {
if doc == nil {
return false
}
if req.Opts.Status != "" && doc.Status != req.Opts.Status {
return false
}
if req.Regex != nil && !req.Regex.MatchString(doc.Key) {
return false
}
return true
}

func (kc *Cache) GetKvDoc(kv *mvccpb.KeyValue) (*model.KVDoc, error) {
kvDoc := &model.KVDoc{}
err := json.Unmarshal(kv.Value, kvDoc)
Expand All @@ -326,3 +285,66 @@ func (kc *Cache) GetCacheKey(domain, project string, labels map[string]string) s
}, "/")
return inputKey
}

func (kc *Cache) isInLabelsSet(Labels map[string]string) bool {
for label := range Labels {
if _, ok := kc.labelsSet[label]; !ok {
return false
}
}
return true
}

func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool, error) {
result := &model.KVResponse{
Data: []*model.KVDoc{},
}
if !req.Opts.ExactLabels || !kvCache.isInLabelsSet(req.Opts.Labels) {
return result, false, nil
}

openlog.Debug(fmt.Sprintf("using cache to search kv, domain %v, project %v, opts %+v", req.Domain, req.Project, *req.Opts))
cacheKey := kvCache.GetCacheKey(req.Domain, req.Project, req.Opts.Labels)

kvIds, ok := kvCache.LoadKvIDSet(cacheKey)
if !ok {
kvCache.StoreKvIDSet(cacheKey, IDSet{})
return result, true, nil
}

var docs []*model.KVDoc

var kvIdsLeft []string
for kvID := range kvIds {
if doc, ok := kvCache.LoadKvDoc(kvID); ok {
docs = append(docs, doc)
continue
}
kvIdsLeft = append(kvIdsLeft, kvID)
}

tpData := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft)
docs = append(docs, tpData...)

for _, doc := range docs {
if isMatch(req, doc) {
datasource.ClearPart(doc)
result.Data = append(result.Data, doc)
}
}
result.Total = len(result.Data)
return result, true, nil
}

func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool {
if doc == nil {
return false
}
if req.Opts.Status != "" && doc.Status != req.Opts.Status {
return false
}
if req.Regex != nil && !req.Regex.MatchString(doc.Key) {
return false
}
return true
}
Loading

0 comments on commit e005d80

Please sign in to comment.