Skip to content

Commit

Permalink
修复wait长查询场景下配置项发生变化,更新缓存失败 (#294)
Browse files Browse the repository at this point in the history
* 修复kie日志打印配置信息

* 修复健康检查被限流住

* 修复watch场景下配置项长时间不改变且持续调用而发生的内存溢出问题

* 修复watch场景下配置项长时间不改变且持续调用而发生的内存溢出问题

* 修复wait长查询时配置发生变化场景下,更新缓存失效

* 修复wait长查询时配置发生变化场景下,更新缓存失效

* 修复wait长查询时配置发生变化场景下,更新缓存失效

* 修复wait长查询时配置发生变化场景下,更新缓存失效

* 修复wait长查询时配置发生变化场景下,更新缓存失效

---------

Co-authored-by: songshiyuan 00649746 <[email protected]>
  • Loading branch information
tornado-ssy and songshiyuan 00649746 authored Aug 28, 2023
1 parent 0e43daf commit 3baf916
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 24 deletions.
22 changes: 0 additions & 22 deletions server/pubsub/notifier/kv.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
package notifier

import (
"context"
"sync"
"time"

"github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/server/cache"
"github.com/apache/servicecomb-kie/server/pubsub"
kvsvc "github.com/apache/servicecomb-kie/server/service/kv"
"github.com/go-chassis/openlog"
"github.com/hashicorp/serf/serf"
)
Expand Down Expand Up @@ -75,30 +71,12 @@ func (h *KVHandler) FindTopicAndFire(ke *pubsub.KVChangeEvent) {
return true
}
if t.Match(ke) {
prepareCache(key.(string), t)
notifyAndRemoveObservers(value, ke)
}
return true
})
}

func prepareCache(topicName string, topic *pubsub.Topic) {
rev, kvs, err := kvsvc.ListKV(context.TODO(), &model.ListKVRequest{
Domain: topic.DomainID,
Project: topic.Project,
Labels: topic.Labels,
Match: topic.MatchType,
})
if err != nil {
openlog.Error("can not query kvs:" + err.Error())
}
cache.CachedKV().Write(topicName, &cache.DBResult{
KVs: kvs,
Rev: rev,
Err: err,
})
}

func notifyAndRemoveObservers(value interface{}, ke *pubsub.KVChangeEvent) {
observers := value.(*sync.Map)
observers.Range(func(id, value interface{}) bool {
Expand Down
20 changes: 19 additions & 1 deletion server/resource/v1/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func getMatchPattern(rctx *restful.Context) string {
}
return m
}
func eventHappened(waitStr string, topic *pubsub.Topic) (bool, string, error) {
func eventHappened(waitStr string, topic *pubsub.Topic, ctx context.Context) (bool, string, error) {
d, err := time.ParseDuration(waitStr)
if err != nil || d > common.MaxWait {
return false, "", errors.New(common.MsgInvalidWait)
Expand All @@ -230,6 +230,7 @@ func eventHappened(waitStr string, topic *pubsub.Topic) (bool, string, error) {
happened = false
pubsub.RemoveObserver(o.UUID, topic)
case <-o.Event:
prepareCache(topicName, topic, ctx)
}
return happened, topicName, nil
}
Expand Down Expand Up @@ -307,3 +308,20 @@ func queryAndResponse(rctx *restful.Context, request *model.ListKVRequest) {
openlog.Error(err.Error())
}
}

func prepareCache(topicName string, topic *pubsub.Topic, ctx context.Context) {
rev, kvs, err := kvsvc.ListKV(ctx, &model.ListKVRequest{
Domain: topic.DomainID,
Project: topic.Project,
Labels: topic.Labels,
Match: topic.MatchType,
})
if err != nil {
openlog.Error("can not query kvs:" + err.Error())
}
cache.CachedKV().Write(topicName, &cache.DBResult{
KVs: kvs,
Rev: rev,
Err: err,
})
}
2 changes: 1 addition & 1 deletion server/resource/v1/kv_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func watch(rctx *restful.Context, request *model.ListKVRequest, wait string) boo
Project: request.Project,
MatchType: request.Match,
DomainID: request.Domain,
})
}, rctx.Ctx)
if err != nil {
WriteErrResponse(rctx, config.ErrObserveEvent, err.Error())
return true
Expand Down

0 comments on commit 3baf916

Please sign in to comment.