diff --git a/common/utils/collection.go b/common/utils/collection.go index 5cbafda4a..5608dcc8f 100644 --- a/common/utils/collection.go +++ b/common/utils/collection.go @@ -276,6 +276,18 @@ func (s *SyncMap[K, V]) Range(f func(key K, val V) bool) { } } +// Values +func (s *SyncMap[K, V]) Values() []V { + s.lock.RLock() + defer s.lock.RUnlock() + + ret := make([]V, 0, len(s.m)) + for _, v := range s.m { + ret = append(ret, v) + } + return ret +} + // Delete func (s *SyncMap[K, V]) Delete(key K) { s.lock.Lock() diff --git a/config/watcher.go b/config/watcher.go index 1b89effcb..611788bce 100644 --- a/config/watcher.go +++ b/config/watcher.go @@ -36,14 +36,17 @@ const ( type FileReleaseCallback func(clientId string, rsp *apiconfig.ConfigClientResponse) bool type watchContext struct { - fileReleaseCb FileReleaseCallback - ClientVersion uint64 + watchConfigFiles []*apiconfig.ClientConfigFileInfo + clientId string + fileReleaseCb FileReleaseCallback + ClientVersion uint64 } // watchCenter 处理客户端订阅配置请求,监听配置文件发布事件通知客户端 type watchCenter struct { - subCtx *eventhub.SubscribtionContext - lock sync.Mutex + connManager *connManager + subCtx *eventhub.SubscribtionContext + lock sync.Mutex // fileId -> clientId -> watchContext configFileWatchers *utils.SyncMap[string, *utils.SyncMap[string, *watchContext]] } @@ -95,8 +98,10 @@ func (wc *watchCenter) AddWatcher(clientId string, watchConfigFiles []*apiconfig return newWatchers }) watchers.Store(clientId, &watchContext{ - fileReleaseCb: fileReleaseCb, - ClientVersion: file.Version.GetValue(), + clientId: clientId, + fileReleaseCb: fileReleaseCb, + ClientVersion: file.Version.GetValue(), + watchConfigFiles: watchConfigFiles, }) } } @@ -129,7 +134,9 @@ func (wc *watchCenter) notifyToWatchers(publishConfigFile *model.SimpleConfigFil response := GenConfigFileResponse(publishConfigFile.Namespace, publishConfigFile.Group, publishConfigFile.FileName, "", publishConfigFile.Md5, publishConfigFile.Version) - watchers.Range(func(clientId string, watchCtx *watchContext) bool { + waitNotifiers := watchers.Values() + for _, watchCtx := range waitNotifiers { + clientId := watchCtx.clientId if watchCtx.ClientVersion < publishConfigFile.Version { watchCtx.fileReleaseCb(clientId, response) log.Info("[Config][Watcher] notify to client.", @@ -141,6 +148,5 @@ func (wc *watchCenter) notifyToWatchers(publishConfigFile *model.SimpleConfigFil zap.Uint64("client-version", watchCtx.ClientVersion), zap.Uint64("version", publishConfigFile.Version)) } - return true - }) + } }