diff --git a/server/pubsub/notifier/kv.go b/server/pubsub/notifier/kv.go index 889941db..5ea63fc7 100644 --- a/server/pubsub/notifier/kv.go +++ b/server/pubsub/notifier/kv.go @@ -1,14 +1,10 @@ package notifier import ( - "context" "sync" "time" - "github.com/apache/servicecomb-kie/pkg/model" - "github.com/apache/servicecomb-kie/server/cache" "github.com/apache/servicecomb-kie/server/pubsub" - kvsvc "github.com/apache/servicecomb-kie/server/service/kv" "github.com/go-chassis/openlog" "github.com/hashicorp/serf/serf" ) @@ -75,30 +71,12 @@ func (h *KVHandler) FindTopicAndFire(ke *pubsub.KVChangeEvent) { return true } if t.Match(ke) { - prepareCache(key.(string), t) notifyAndRemoveObservers(value, ke) } return true }) } -func prepareCache(topicName string, topic *pubsub.Topic) { - rev, kvs, err := kvsvc.ListKV(context.TODO(), &model.ListKVRequest{ - Domain: topic.DomainID, - Project: topic.Project, - Labels: topic.Labels, - Match: topic.MatchType, - }) - if err != nil { - openlog.Error("can not query kvs:" + err.Error()) - } - cache.CachedKV().Write(topicName, &cache.DBResult{ - KVs: kvs, - Rev: rev, - Err: err, - }) -} - func notifyAndRemoveObservers(value interface{}, ke *pubsub.KVChangeEvent) { observers := value.(*sync.Map) observers.Range(func(id, value interface{}) bool { diff --git a/server/resource/v1/common.go b/server/resource/v1/common.go index a3d8f0a8..565d0a7e 100644 --- a/server/resource/v1/common.go +++ b/server/resource/v1/common.go @@ -210,7 +210,7 @@ func getMatchPattern(rctx *restful.Context) string { } return m } -func eventHappened(waitStr string, topic *pubsub.Topic) (bool, string, error) { +func eventHappened(waitStr string, topic *pubsub.Topic, ctx context.Context) (bool, string, error) { d, err := time.ParseDuration(waitStr) if err != nil || d > common.MaxWait { return false, "", errors.New(common.MsgInvalidWait) @@ -230,6 +230,7 @@ func eventHappened(waitStr string, topic *pubsub.Topic) (bool, string, error) { happened = false pubsub.RemoveObserver(o.UUID, topic) case <-o.Event: + prepareCache(topicName, topic, ctx) } return happened, topicName, nil } @@ -307,3 +308,20 @@ func queryAndResponse(rctx *restful.Context, request *model.ListKVRequest) { openlog.Error(err.Error()) } } + +func prepareCache(topicName string, topic *pubsub.Topic, ctx context.Context) { + rev, kvs, err := kvsvc.ListKV(ctx, &model.ListKVRequest{ + Domain: topic.DomainID, + Project: topic.Project, + Labels: topic.Labels, + Match: topic.MatchType, + }) + if err != nil { + openlog.Error("can not query kvs:" + err.Error()) + } + cache.CachedKV().Write(topicName, &cache.DBResult{ + KVs: kvs, + Rev: rev, + Err: err, + }) +} diff --git a/server/resource/v1/kv_resource.go b/server/resource/v1/kv_resource.go index e75b5052..0f259d2c 100644 --- a/server/resource/v1/kv_resource.go +++ b/server/resource/v1/kv_resource.go @@ -267,7 +267,7 @@ func watch(rctx *restful.Context, request *model.ListKVRequest, wait string) boo Project: request.Project, MatchType: request.Match, DomainID: request.Domain, - }) + }, rctx.Ctx) if err != nil { WriteErrResponse(rctx, config.ErrObserveEvent, err.Error()) return true