Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] fix inconsistency bug between cache layer and etcd. #287

Merged
merged 3 commits into from
Mar 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion examples/dev/kie-conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,10 @@ db:
# rsaPublicKeyFile: ./examples/dev/public.key
sync:
# turn on the synchronization switch related operations will be written to the task in the db
enabled: false
enabled: false
#cache:
# labels:
# - environment
# - service
# - app
# - version
11 changes: 8 additions & 3 deletions server/config/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package config

// Config is yaml file struct
type Config struct {
DB DB `yaml:"db"`
RBAC RBAC `yaml:"rbac"`
Sync Sync `yaml:"sync"`
DB DB `yaml:"db"`
RBAC RBAC `yaml:"rbac"`
Sync Sync `yaml:"sync"`
Cache Cache `yaml:"cache"`
//config from cli
ConfigFile string
NodeName string
Expand Down Expand Up @@ -59,3 +60,7 @@ type RBAC struct {
type Sync struct {
Enabled bool `yaml:"enabled"`
}

type Cache struct {
Labels []string `yaml:"labels"`
}
152 changes: 87 additions & 65 deletions server/datasource/etcd/kv/kv_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/pkg/stringutil"
"github.com/apache/servicecomb-kie/server/config"
"github.com/apache/servicecomb-kie/server/datasource"
"github.com/apache/servicecomb-kie/server/datasource/etcd/key"
"github.com/go-chassis/foundation/backoff"
Expand All @@ -37,33 +38,41 @@ const (

type IDSet map[string]struct{}

type Cache struct {
timeOut time.Duration
client etcdadpt.Client
revision int64
kvIDCache sync.Map
kvDocCache *goCache.Cache
type LabelsSet map[string]struct{}

type CacheSearchReq struct {
Domain string
Project string
Opts *datasource.FindOptions
Regex *regexp.Regexp
}

func NewKvCache() *Cache {
kvDocCache := goCache.New(cacheExpirationTime, cacheCleanupInterval)
labelsSet := LabelsSet{}
for _, label := range config.Configurations.Cache.Labels {
labelsSet[label] = struct{}{}
}
return &Cache{
timeOut: etcdWatchTimeout,
client: etcdadpt.Instance(),
revision: 0,
kvDocCache: kvDocCache,
labelsSet: labelsSet,
}
}

func Enabled() bool {
return kvCache != nil
}

type CacheSearchReq struct {
Domain string
Project string
Opts *datasource.FindOptions
Regex *regexp.Regexp
type Cache struct {
timeOut time.Duration
client etcdadpt.Client
revision int64
kvIDCache sync.Map
kvDocCache *goCache.Cache
labelsSet LabelsSet
}

func (kc *Cache) Refresh(ctx context.Context) {
Expand Down Expand Up @@ -130,7 +139,7 @@ func (kc *Cache) list(ctx context.Context) (*etcdadpt.Response, error) {
return rsp, nil
}

func (kc *Cache) watchCallBack(message string, rsp *etcdadpt.Response) error {
func (kc *Cache) watchCallBack(_ string, rsp *etcdadpt.Response) error {
if rsp == nil || len(rsp.Kvs) == 0 {
return fmt.Errorf("unknown event")
}
Expand All @@ -154,6 +163,9 @@ func (kc *Cache) cachePut(rsp *etcdadpt.Response) {
openlog.Error(fmt.Sprintf("failed to unmarshal kv, err %v", err))
continue
}
if !kc.isInLabelsSet(kvDoc.Labels) {
continue
}
kc.StoreKvDoc(kvDoc.ID, kvDoc)
cacheKey := kc.GetCacheKey(kvDoc.Domain, kvDoc.Project, kvDoc.Labels)
m, ok := kc.LoadKvIDSet(cacheKey)
Expand Down Expand Up @@ -220,46 +232,6 @@ func (kc *Cache) DeleteKvDoc(kvID string) {
kc.kvDocCache.Delete(kvID)
}

func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool, error) {
if !req.Opts.ExactLabels {
return nil, false, nil
}

openlog.Debug(fmt.Sprintf("using cache to search kv, domain %v, project %v, opts %+v", req.Domain, req.Project, *req.Opts))
result := &model.KVResponse{
Data: []*model.KVDoc{},
}
cacheKey := kvCache.GetCacheKey(req.Domain, req.Project, req.Opts.Labels)
kvIds, ok := kvCache.LoadKvIDSet(cacheKey)
if !ok {
kvCache.StoreKvIDSet(cacheKey, IDSet{})
return result, true, nil
}

var docs []*model.KVDoc

var kvIdsLeft []string
for kvID := range kvIds {
if doc, ok := kvCache.LoadKvDoc(kvID); ok {
docs = append(docs, doc)
continue
}
kvIdsLeft = append(kvIdsLeft, kvID)
}

tpData := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft)
docs = append(docs, tpData...)

for _, doc := range docs {
if isMatch(req, doc) {
datasource.ClearPart(doc)
result.Data = append(result.Data, doc)
}
}
result.Total = len(result.Data)
return result, true, nil
}

func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLeft []string) []*model.KVDoc {
if len(kvIdsLeft) == 0 {
return nil
Expand Down Expand Up @@ -294,19 +266,6 @@ func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLe
return docs
}

func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool {
if doc == nil {
return false
}
if req.Opts.Status != "" && doc.Status != req.Opts.Status {
return false
}
if req.Regex != nil && !req.Regex.MatchString(doc.Key) {
return false
}
return true
}

func (kc *Cache) GetKvDoc(kv *mvccpb.KeyValue) (*model.KVDoc, error) {
kvDoc := &model.KVDoc{}
err := json.Unmarshal(kv.Value, kvDoc)
Expand All @@ -326,3 +285,66 @@ func (kc *Cache) GetCacheKey(domain, project string, labels map[string]string) s
}, "/")
return inputKey
}

func (kc *Cache) isInLabelsSet(Labels map[string]string) bool {
for label := range Labels {
if _, ok := kc.labelsSet[label]; !ok {
return false
}
}
return true
}

func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool, error) {
result := &model.KVResponse{
Data: []*model.KVDoc{},
}
if !req.Opts.ExactLabels || !kvCache.isInLabelsSet(req.Opts.Labels) {
return result, false, nil
}

openlog.Debug(fmt.Sprintf("using cache to search kv, domain %v, project %v, opts %+v", req.Domain, req.Project, *req.Opts))
cacheKey := kvCache.GetCacheKey(req.Domain, req.Project, req.Opts.Labels)

kvIds, ok := kvCache.LoadKvIDSet(cacheKey)
if !ok {
kvCache.StoreKvIDSet(cacheKey, IDSet{})
return result, true, nil
}

var docs []*model.KVDoc

var kvIdsLeft []string
for kvID := range kvIds {
if doc, ok := kvCache.LoadKvDoc(kvID); ok {
docs = append(docs, doc)
continue
}
kvIdsLeft = append(kvIdsLeft, kvID)
}

tpData := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft)
docs = append(docs, tpData...)

for _, doc := range docs {
if isMatch(req, doc) {
datasource.ClearPart(doc)
result.Data = append(result.Data, doc)
}
}
result.Total = len(result.Data)
return result, true, nil
}

func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool {
if doc == nil {
return false
}
if req.Opts.Status != "" && doc.Status != req.Opts.Status {
return false
}
if req.Regex != nil && !req.Regex.MatchString(doc.Key) {
return false
}
return true
}
Loading