diff --git a/clients/config_client/config_client.go b/clients/config_client/config_client.go index 2cb15bde..72aa22cc 100644 --- a/clients/config_client/config_client.go +++ b/clients/config_client/config_client.go @@ -394,71 +394,61 @@ func (client *ConfigClient) startInternal() { } func (client *ConfigClient) executeConfigListen() { - listenCachesMap := make(map[int][]cacheData, 16) - needAllSync := time.Since(client.lastAllSyncTime) >= constant.ALL_SYNC_INTERNAL - for _, v := range client.cacheMap.Items() { - cache, ok := v.(cacheData) + var ( + needAllSync = time.Since(client.lastAllSyncTime) >= constant.ALL_SYNC_INTERNAL + hasChangedKeys = false + ) + + listenTaskMap := client.buildListenTask(needAllSync) + if len(listenTaskMap) == 0 { + return + } + + for taskId, caches := range listenTaskMap { + request := buildConfigBatchListenRequest(caches) + rpcClient := client.configProxy.createRpcClient(client.ctx, fmt.Sprintf("%d", taskId), client) + iResponse, err := client.configProxy.requestProxy(rpcClient, request, 3000) + if err != nil { + logger.Warnf("ConfigBatchListenRequest failure, err:%v", err) + continue + } + if iResponse == nil { + logger.Warnf("ConfigBatchListenRequest failure, response is nil") + continue + } + if !iResponse.IsSuccess() { + logger.Warnf("ConfigBatchListenRequest failure, error code:%d", iResponse.GetErrorCode()) + continue + } + response, ok := iResponse.(*rpc_response.ConfigChangeBatchListenResponse) if !ok { continue } - if cache.isSyncWithServer { - if cache.md5 != cache.cacheDataListener.lastMd5 { - cache.executeListener() - } - if !needAllSync { - continue + if len(response.ChangedConfigs) > 0 { + hasChangedKeys = true + } + changeKeys := make(map[string]struct{}, len(response.ChangedConfigs)) + for _, v := range response.ChangedConfigs { + changeKey := util.GetConfigCacheKey(v.DataId, v.Group, v.Tenant) + changeKeys[changeKey] = struct{}{} + if value, ok := client.cacheMap.Get(changeKey); ok { + cData := value.(cacheData) + client.refreshContentAndCheck(cData, !cData.isInitializing) } } - cacheDatas := listenCachesMap[cache.taskId] - cacheDatas = append(cacheDatas, cache) - listenCachesMap[cache.taskId] = cacheDatas - } - hasChangedKeys := false - if len(listenCachesMap) > 0 { - for taskId, listenCaches := range listenCachesMap { - request := buildConfigBatchListenRequest(listenCaches) - rpcClient := client.configProxy.createRpcClient(client.ctx, fmt.Sprintf("%d", taskId), client) - iResponse, err := client.configProxy.requestProxy(rpcClient, request, 3000) - if err != nil { - logger.Warnf("ConfigBatchListenRequest failure,err:%+v", err) - continue - } - if iResponse == nil { - logger.Warnf("ConfigBatchListenRequest failure, response is nil") + for _, v := range caches { + changeKey := util.GetConfigCacheKey(v.dataId, v.group, v.tenant) + if _, ok := changeKeys[changeKey]; !ok { + v.isSyncWithServer = true + client.cacheMap.Set(changeKey, v) continue } - if !iResponse.IsSuccess() { - logger.Warnf("ConfigBatchListenRequest failure, error code:%+v", iResponse.GetErrorCode()) - continue - } - changeKeys := make(map[string]struct{}) - if response, ok := iResponse.(*rpc_response.ConfigChangeBatchListenResponse); ok { - if len(response.ChangedConfigs) > 0 { - hasChangedKeys = true - for _, v := range response.ChangedConfigs { - changeKey := util.GetConfigCacheKey(v.DataId, v.Group, v.Tenant) - changeKeys[changeKey] = struct{}{} - if cache, ok := client.cacheMap.Get(changeKey); !ok { - continue - } else { - cacheData := cache.(cacheData) - client.refreshContentAndCheck(cacheData, !cacheData.isInitializing) - } - } - } - - for _, v := range listenCaches { - changeKey := util.GetConfigCacheKey(v.dataId, v.group, v.tenant) - if _, ok := changeKeys[changeKey]; !ok { - v.isSyncWithServer = true - continue - } - v.isInitializing = true - } - } + v.isInitializing = true + client.cacheMap.Set(changeKey, v) } + } if needAllSync { client.lastAllSyncTime = time.Now() @@ -501,6 +491,28 @@ func (client *ConfigClient) refreshContentAndCheck(cacheData cacheData, notify b } } +func (client *ConfigClient) buildListenTask(needAllSync bool) map[int][]cacheData { + listenTaskMap := make(map[int][]cacheData, 8) + + for _, v := range client.cacheMap.Items() { + data, ok := v.(cacheData) + if !ok { + continue + } + + if data.isSyncWithServer { + if data.md5 != data.cacheDataListener.lastMd5 { + data.executeListener() + } + if !needAllSync { + continue + } + } + listenTaskMap[data.taskId] = append(listenTaskMap[data.taskId], data) + } + return listenTaskMap +} + func (client *ConfigClient) asyncNotifyListenConfig() { go func() { client.listenExecute <- struct{}{}