diff --git a/sermant-agentcore/sermant-agentcore-implement/src/main/java/com/huaweicloud/sermant/implement/service/dynamicconfig/kie/client/kie/KieListenerWrapper.java b/sermant-agentcore/sermant-agentcore-implement/src/main/java/com/huaweicloud/sermant/implement/service/dynamicconfig/kie/client/kie/KieListenerWrapper.java index 2922f4adf7..8a999d0748 100644 --- a/sermant-agentcore/sermant-agentcore-implement/src/main/java/com/huaweicloud/sermant/implement/service/dynamicconfig/kie/client/kie/KieListenerWrapper.java +++ b/sermant-agentcore/sermant-agentcore-implement/src/main/java/com/huaweicloud/sermant/implement/service/dynamicconfig/kie/client/kie/KieListenerWrapper.java @@ -63,13 +63,14 @@ public class KieListenerWrapper { * @param dynamicConfigListener 监听器 * @param kvDataHolder 数据持有器 * @param kieRequest 请求 + * @param ifNotify 是否需要初始化通知 */ public KieListenerWrapper(String key, DynamicConfigListener dynamicConfigListener, - KvDataHolder kvDataHolder, KieRequest kieRequest) { + KvDataHolder kvDataHolder, KieRequest kieRequest, boolean ifNotify) { this.group = kieRequest.getLabelCondition(); this.kvDataHolder = kvDataHolder; this.kieRequest = kieRequest; - addKeyListener(key, dynamicConfigListener); + addKeyListener(key, dynamicConfigListener, ifNotify); } /** @@ -82,34 +83,49 @@ public void notifyListeners(KvDataHolder.EventDataHolder eventDataHolder, boolea currentVersion = eventDataHolder.getVersion(); if (!eventDataHolder.getAdded().isEmpty()) { // 新增事件 - notify(eventDataHolder.getAdded(), isFirst ? DynamicConfigEventType.INIT : DynamicConfigEventType.CREATE); + notifyAdded(eventDataHolder.getAdded(), eventDataHolder.getLatestData(), isFirst); } if (!eventDataHolder.getDeleted().isEmpty()) { // 删除事件 - notify(eventDataHolder.getDeleted(), DynamicConfigEventType.DELETE); + notify(eventDataHolder.getDeleted(), DynamicConfigEventType.DELETE, null); } if (!eventDataHolder.getModified().isEmpty()) { // 修改事件 - notify(eventDataHolder.getModified(), DynamicConfigEventType.MODIFY); + notify(eventDataHolder.getModified(), DynamicConfigEventType.MODIFY, null); } } - private void notify(Map configData, DynamicConfigEventType dynamicConfigEventType) { + private void notifyAdded(Map addedData, Map latestData, boolean isFirst) { + if (!isFirst) { + notify(addedData, DynamicConfigEventType.CREATE, null); + return; + } + notify(addedData, DynamicConfigEventType.CREATE, latestData); + } + + private void notify(Map configData, DynamicConfigEventType dynamicConfigEventType, + Map latestData) { for (Map.Entry entry : configData.entrySet()) { // 通知单个key监听器 - notifyEvent(entry.getKey(), entry.getValue(), dynamicConfigEventType, false); + notifyEvent(entry.getKey(), entry.getValue(), dynamicConfigEventType, false, latestData); // 通知该Group的监听器做更新 - notifyEvent(entry.getKey(), entry.getValue(), dynamicConfigEventType, true); + notifyEvent(entry.getKey(), entry.getValue(), dynamicConfigEventType, true, latestData); } } - private void notifyEvent(String key, String value, DynamicConfigEventType eventType, boolean isGroup) { + private void notifyEvent(String key, String value, DynamicConfigEventType eventType, boolean isGroup, + Map latestData) { final VersionListenerWrapper versionListenerWrapper = keyListenerMap .get(isGroup ? KieConstants.DEFAULT_GROUP_KEY : key); if (versionListenerWrapper == null) { return; } + notifyEvent(key, value, eventType, versionListenerWrapper, latestData); + } + + private void notifyEvent(String key, String value, DynamicConfigEventType eventType, VersionListenerWrapper wrapper, + Map latestData) { DynamicConfigEvent event; switch (eventType) { case INIT: @@ -128,24 +144,49 @@ private void notifyEvent(String key, String value, DynamicConfigEventType eventT LOGGER.warning(String.format(Locale.ENGLISH, "Event type [%s] is invalid. ", eventType)); return; } - processAllListeners(event, versionListenerWrapper); + processAllListeners(event, wrapper, latestData); } - private void processAllListeners(DynamicConfigEvent event, VersionListenerWrapper versionListenerWrapper) { - if (versionListenerWrapper.listeners != null) { - for (VersionListener versionListener : versionListenerWrapper.listeners) { - try { - if (versionListener.version > currentVersion) { - // 已通知的listener不再通知, 避免针对同一个group的多个不同key进行多次重复通知 - return; - } + private void processAllListeners(DynamicConfigEvent event, VersionListenerWrapper versionListenerWrapper, + Map latestData) { + if (versionListenerWrapper.listeners == null) { + return; + } + for (VersionListener versionListener : versionListenerWrapper.listeners) { + try { + if (versionListener.version > currentVersion) { + // 已通知的listener不再通知, 避免针对同一个group的多个不同key进行多次重复通知 + continue; + } + if (event.getEventType() == DynamicConfigEventType.INIT) { + processInit(latestData, versionListener, event); + } else { versionListener.listener.process(event); - versionListener.version = currentVersion; - } catch (Exception ex) { - LOGGER.log(Level.WARNING, String.format(Locale.ENGLISH, - "Process config data failed, key: [%s], group: [%s]", - event.getKey(), this.group), ex); } + versionListener.version = currentVersion; + } catch (Exception ex) { + LOGGER.log(Level.WARNING, String.format(Locale.ENGLISH, + "Process config data failed, key: [%s], group: [%s]", + event.getKey(), this.group), ex); + } + } + } + + private void processInit(Map latestData, VersionListener versionListener, + DynamicConfigEvent event) { + if (versionListener.isNeedInit && !versionListener.isInitializer && latestData != null) { + // 需要初始化, 但是未初始化, 全量数据通知 + for (Map.Entry entry : latestData.entrySet()) { + versionListener.listener + .process(DynamicConfigEvent.initEvent(entry.getKey(), event.getGroup(), entry.getValue())); + } + versionListener.isInitializer = true; + versionListener.initVersion = currentVersion; + } else { + if (versionListener.initVersion != 0 && versionListener.initVersion != currentVersion) { + // 其他已经通知过或者不需要通知的采用add事件, 通知前判断其初始化通知的版本是否为当前版本, 若为当前版本则表示已经全量通知过, 无需再次通知 + versionListener.listener.process(DynamicConfigEvent.createEvent(event.getKey(), event.getGroup(), + event.getContent())); } } } @@ -155,13 +196,14 @@ private void processAllListeners(DynamicConfigEvent event, VersionListenerWrappe * * @param key 键 * @param dynamicConfigListener 监听器 + * @param ifNotify 是否初始化通知 */ - public void addKeyListener(String key, DynamicConfigListener dynamicConfigListener) { + public void addKeyListener(String key, DynamicConfigListener dynamicConfigListener, boolean ifNotify) { VersionListenerWrapper versionListenerWrapper = keyListenerMap.get(key); if (versionListenerWrapper == null) { versionListenerWrapper = new VersionListenerWrapper(); } - versionListenerWrapper.addListener(dynamicConfigListener); + versionListenerWrapper.addListener(dynamicConfigListener, ifNotify); keyListenerMap.put(key, versionListenerWrapper); } @@ -222,13 +264,13 @@ static class VersionListenerWrapper { listeners = new HashSet<>(); } - void addListener(DynamicConfigListener listener) { - listeners.add(new VersionListener(-1L, listener)); + void addListener(DynamicConfigListener listener, boolean ifNotify) { + listeners.add(new VersionListener(-1L, listener, ifNotify)); } boolean removeListener(DynamicConfigListener listener) { // VersionListener基于listener移除 - return listeners.remove(new VersionListener(-1L, listener)); + return listeners.remove(new VersionListener(-1L, listener, false)); } } @@ -245,9 +287,25 @@ static class VersionListener { */ long version; - VersionListener(long version, DynamicConfigListener listener) { + /** + * 是否被INIT事件通知, 该事件仅通知一次 + */ + boolean isInitializer = false; + + /** + * 是否需要首次通知 + */ + boolean isNeedInit; + + /** + * 被初始化的版本 + */ + long initVersion; + + VersionListener(long version, DynamicConfigListener listener, boolean isNeedInit) { this.listener = listener; this.version = version; + this.isNeedInit = isNeedInit; } @Override diff --git a/sermant-agentcore/sermant-agentcore-implement/src/main/java/com/huaweicloud/sermant/implement/service/dynamicconfig/kie/listener/KvDataHolder.java b/sermant-agentcore/sermant-agentcore-implement/src/main/java/com/huaweicloud/sermant/implement/service/dynamicconfig/kie/listener/KvDataHolder.java index 83194fe1b9..f0f32c26f0 100644 --- a/sermant-agentcore/sermant-agentcore-implement/src/main/java/com/huaweicloud/sermant/implement/service/dynamicconfig/kie/listener/KvDataHolder.java +++ b/sermant-agentcore/sermant-agentcore-implement/src/main/java/com/huaweicloud/sermant/implement/service/dynamicconfig/kie/listener/KvDataHolder.java @@ -47,7 +47,7 @@ public EventDataHolder analyzeLatestData(KieResponse response, boolean isFirst) clear(); } final Map latestData = formatKieResponse(response); - final EventDataHolder eventDataHolder = new EventDataHolder(formatRevision(response.getRevision())); + final EventDataHolder eventDataHolder = new EventDataHolder(formatRevision(response.getRevision()), latestData); if (currentData != null) { if (latestData.isEmpty()) { eventDataHolder.deleted.putAll(currentData); @@ -118,6 +118,11 @@ public static class EventDataHolder { */ private final long version; + /** + * 最新的全量数据 + */ + private final Map latestData; + /** * 修改的key */ @@ -137,12 +142,18 @@ public static class EventDataHolder { * Constructor. * * @param version version + * @param latestData 最新全量数据 */ - public EventDataHolder(long version) { + public EventDataHolder(long version, Map latestData) { modified = new HashMap(); deleted = new HashMap(); added = new HashMap(); this.version = version; + this.latestData = latestData; + } + + public Map getLatestData() { + return latestData; } public Map getModified() { diff --git a/sermant-agentcore/sermant-agentcore-implement/src/main/java/com/huaweicloud/sermant/implement/service/dynamicconfig/kie/listener/SubscriberManager.java b/sermant-agentcore/sermant-agentcore-implement/src/main/java/com/huaweicloud/sermant/implement/service/dynamicconfig/kie/listener/SubscriberManager.java index 8317668fce..6f5f60b373 100644 --- a/sermant-agentcore/sermant-agentcore-implement/src/main/java/com/huaweicloud/sermant/implement/service/dynamicconfig/kie/listener/SubscriberManager.java +++ b/sermant-agentcore/sermant-agentcore-implement/src/main/java/com/huaweicloud/sermant/implement/service/dynamicconfig/kie/listener/SubscriberManager.java @@ -284,7 +284,7 @@ public boolean subscribe(String key, KieRequest kieRequest, DynamicConfigListene if (oldWrapper == null) { return firstSubscribeForGroup(key, kieRequest, dynamicConfigListener, ifNotify); } else { - oldWrapper.addKeyListener(key, dynamicConfigListener); + oldWrapper.addKeyListener(key, dynamicConfigListener, ifNotify); tryNotify(oldWrapper.getKieRequest(), oldWrapper, ifNotify); return true; } @@ -301,7 +301,7 @@ private boolean firstSubscribeForGroup(String key, KieRequest kieRequest, final KieSubscriber kieSubscriber = new KieSubscriber(kieRequest); Task task; KieListenerWrapper kieListenerWrapper = - new KieListenerWrapper(key, dynamicConfigListener, new KvDataHolder(), kieRequest); + new KieListenerWrapper(key, dynamicConfigListener, new KvDataHolder(), kieRequest, ifNotify); if (!kieSubscriber.isLongConnectionRequest()) { task = new ShortTimerTask(kieSubscriber, kieListenerWrapper); } else { diff --git a/sermant-plugins/sermant-dynamic-config/dynamic-config-plugin/src/main/java/com/huawei/dynamic/config/source/OriginConfigCenterDisableListener.java b/sermant-plugins/sermant-dynamic-config/dynamic-config-plugin/src/main/java/com/huawei/dynamic/config/source/OriginConfigCenterDisableListener.java index 9ef32916a5..19936291f3 100644 --- a/sermant-plugins/sermant-dynamic-config/dynamic-config-plugin/src/main/java/com/huawei/dynamic/config/source/OriginConfigCenterDisableListener.java +++ b/sermant-plugins/sermant-dynamic-config/dynamic-config-plugin/src/main/java/com/huawei/dynamic/config/source/OriginConfigCenterDisableListener.java @@ -22,6 +22,7 @@ import com.huawei.dynamic.config.entity.DynamicConstants; import com.huaweicloud.sermant.core.common.LoggerFactory; +import com.huaweicloud.sermant.core.service.dynamicconfig.common.DynamicConfigEvent; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; @@ -61,6 +62,9 @@ public class OriginConfigCenterDisableListener implements BeanFactoryAware { @Autowired private Environment environment; + @Autowired + private SpringEventPublisher springEventPublisher; + private BeanFactory beanFactory; /** @@ -84,6 +88,10 @@ public void addListener() { } } tryAddDynamicSourceToFirst(environment); + + // 发布刷新事件补偿, 及时刷新禁用后的数据 + springEventPublisher.publishRefreshEvent(DynamicConfigEvent.modifyEvent(event.getKey(), event.getGroup(), + event.getContent())); }); } diff --git a/sermant-plugins/sermant-dynamic-config/dynamic-config-plugin/src/main/java/com/huawei/dynamic/config/source/SpringEventPublisher.java b/sermant-plugins/sermant-dynamic-config/dynamic-config-plugin/src/main/java/com/huawei/dynamic/config/source/SpringEventPublisher.java index fdc281a7d2..4adede1925 100644 --- a/sermant-plugins/sermant-dynamic-config/dynamic-config-plugin/src/main/java/com/huawei/dynamic/config/source/SpringEventPublisher.java +++ b/sermant-plugins/sermant-dynamic-config/dynamic-config-plugin/src/main/java/com/huawei/dynamic/config/source/SpringEventPublisher.java @@ -49,8 +49,10 @@ private void initDynamicConfig() { /** * 发布spring刷新事件{@link org.springframework.cloud.endpoint.event.RefreshEvent} + * + * @param event 事件 */ - private void publishRefreshEvent(DynamicConfigEvent event) { + public void publishRefreshEvent(DynamicConfigEvent event) { if (event.getEventType() == DynamicConfigEventType.INIT) { return; } diff --git a/sermant-plugins/sermant-dynamic-config/dynamic-config-plugin/src/test/java/com/huawei/dynamic/config/source/OriginConfigCenterDisableListenerTest.java b/sermant-plugins/sermant-dynamic-config/dynamic-config-plugin/src/test/java/com/huawei/dynamic/config/source/OriginConfigCenterDisableListenerTest.java index 61c2b668d4..06dd003a08 100644 --- a/sermant-plugins/sermant-dynamic-config/dynamic-config-plugin/src/test/java/com/huawei/dynamic/config/source/OriginConfigCenterDisableListenerTest.java +++ b/sermant-plugins/sermant-dynamic-config/dynamic-config-plugin/src/test/java/com/huawei/dynamic/config/source/OriginConfigCenterDisableListenerTest.java @@ -84,6 +84,8 @@ public void test() { pluginConfigManagerMockedStatic.when(() -> PluginConfigManager.getPluginConfig(DynamicConfiguration.class)) .thenReturn(configuration); final OriginConfigCenterDisableListener originConfigCenterDisableListener = new OriginConfigCenterDisableListener(); + ReflectUtils.setFieldValue(originConfigCenterDisableListener, "springEventPublisher", + Mockito.mock(SpringEventPublisher.class)); final BeanFactory beanFactory = Mockito.mock(BeanFactory.class); originConfigCenterDisableListener.setBeanFactory(beanFactory); originConfigCenterDisableListener.addListener();