Skip to content

Commit

Permalink
Merge pull request #972 from justforstudy-A/fix-dynamic-bug
Browse files Browse the repository at this point in the history
【fix】处理因插件多次订阅初始化问题导致动态配置插件不能及时刷新宿主配置
  • Loading branch information
robotLJW authored Nov 25, 2022
2 parents 4804c3f + b15d7b9 commit fd95090
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,14 @@ public class KieListenerWrapper {
* @param dynamicConfigListener 监听器
* @param kvDataHolder 数据持有器
* @param kieRequest 请求
* @param ifNotify 是否需要初始化通知
*/
public KieListenerWrapper(String key, DynamicConfigListener dynamicConfigListener,
KvDataHolder kvDataHolder, KieRequest kieRequest) {
KvDataHolder kvDataHolder, KieRequest kieRequest, boolean ifNotify) {
this.group = kieRequest.getLabelCondition();
this.kvDataHolder = kvDataHolder;
this.kieRequest = kieRequest;
addKeyListener(key, dynamicConfigListener);
addKeyListener(key, dynamicConfigListener, ifNotify);
}

/**
Expand All @@ -82,34 +83,49 @@ public void notifyListeners(KvDataHolder.EventDataHolder eventDataHolder, boolea
currentVersion = eventDataHolder.getVersion();
if (!eventDataHolder.getAdded().isEmpty()) {
// 新增事件
notify(eventDataHolder.getAdded(), isFirst ? DynamicConfigEventType.INIT : DynamicConfigEventType.CREATE);
notifyAdded(eventDataHolder.getAdded(), eventDataHolder.getLatestData(), isFirst);
}
if (!eventDataHolder.getDeleted().isEmpty()) {
// 删除事件
notify(eventDataHolder.getDeleted(), DynamicConfigEventType.DELETE);
notify(eventDataHolder.getDeleted(), DynamicConfigEventType.DELETE, null);
}
if (!eventDataHolder.getModified().isEmpty()) {
// 修改事件
notify(eventDataHolder.getModified(), DynamicConfigEventType.MODIFY);
notify(eventDataHolder.getModified(), DynamicConfigEventType.MODIFY, null);
}
}

private void notify(Map<String, String> configData, DynamicConfigEventType dynamicConfigEventType) {
private void notifyAdded(Map<String, String> addedData, Map<String, String> latestData, boolean isFirst) {
if (!isFirst) {
notify(addedData, DynamicConfigEventType.CREATE, null);
return;
}
notify(addedData, DynamicConfigEventType.CREATE, latestData);
}

private void notify(Map<String, String> configData, DynamicConfigEventType dynamicConfigEventType,
Map<String, String> latestData) {
for (Map.Entry<String, String> entry : configData.entrySet()) {
// 通知单个key监听器
notifyEvent(entry.getKey(), entry.getValue(), dynamicConfigEventType, false);
notifyEvent(entry.getKey(), entry.getValue(), dynamicConfigEventType, false, latestData);

// 通知该Group的监听器做更新
notifyEvent(entry.getKey(), entry.getValue(), dynamicConfigEventType, true);
notifyEvent(entry.getKey(), entry.getValue(), dynamicConfigEventType, true, latestData);
}
}

private void notifyEvent(String key, String value, DynamicConfigEventType eventType, boolean isGroup) {
private void notifyEvent(String key, String value, DynamicConfigEventType eventType, boolean isGroup,
Map<String, String> latestData) {
final VersionListenerWrapper versionListenerWrapper = keyListenerMap
.get(isGroup ? KieConstants.DEFAULT_GROUP_KEY : key);
if (versionListenerWrapper == null) {
return;
}
notifyEvent(key, value, eventType, versionListenerWrapper, latestData);
}

private void notifyEvent(String key, String value, DynamicConfigEventType eventType, VersionListenerWrapper wrapper,
Map<String, String> latestData) {
DynamicConfigEvent event;
switch (eventType) {
case INIT:
Expand All @@ -128,24 +144,49 @@ private void notifyEvent(String key, String value, DynamicConfigEventType eventT
LOGGER.warning(String.format(Locale.ENGLISH, "Event type [%s] is invalid. ", eventType));
return;
}
processAllListeners(event, versionListenerWrapper);
processAllListeners(event, wrapper, latestData);
}

private void processAllListeners(DynamicConfigEvent event, VersionListenerWrapper versionListenerWrapper) {
if (versionListenerWrapper.listeners != null) {
for (VersionListener versionListener : versionListenerWrapper.listeners) {
try {
if (versionListener.version > currentVersion) {
// 已通知的listener不再通知, 避免针对同一个group的多个不同key进行多次重复通知
return;
}
private void processAllListeners(DynamicConfigEvent event, VersionListenerWrapper versionListenerWrapper,
Map<String, String> latestData) {
if (versionListenerWrapper.listeners == null) {
return;
}
for (VersionListener versionListener : versionListenerWrapper.listeners) {
try {
if (versionListener.version > currentVersion) {
// 已通知的listener不再通知, 避免针对同一个group的多个不同key进行多次重复通知
continue;
}
if (event.getEventType() == DynamicConfigEventType.INIT) {
processInit(latestData, versionListener, event);
} else {
versionListener.listener.process(event);
versionListener.version = currentVersion;
} catch (Exception ex) {
LOGGER.log(Level.WARNING, String.format(Locale.ENGLISH,
"Process config data failed, key: [%s], group: [%s]",
event.getKey(), this.group), ex);
}
versionListener.version = currentVersion;
} catch (Exception ex) {
LOGGER.log(Level.WARNING, String.format(Locale.ENGLISH,
"Process config data failed, key: [%s], group: [%s]",
event.getKey(), this.group), ex);
}
}
}

private void processInit(Map<String, String> latestData, VersionListener versionListener,
DynamicConfigEvent event) {
if (versionListener.isNeedInit && !versionListener.isInitializer && latestData != null) {
// 需要初始化, 但是未初始化, 全量数据通知
for (Map.Entry<String, String> entry : latestData.entrySet()) {
versionListener.listener
.process(DynamicConfigEvent.initEvent(entry.getKey(), event.getGroup(), entry.getValue()));
}
versionListener.isInitializer = true;
versionListener.initVersion = currentVersion;
} else {
if (versionListener.initVersion != 0 && versionListener.initVersion != currentVersion) {
// 其他已经通知过或者不需要通知的采用add事件, 通知前判断其初始化通知的版本是否为当前版本, 若为当前版本则表示已经全量通知过, 无需再次通知
versionListener.listener.process(DynamicConfigEvent.createEvent(event.getKey(), event.getGroup(),
event.getContent()));
}
}
}
Expand All @@ -155,13 +196,14 @@ private void processAllListeners(DynamicConfigEvent event, VersionListenerWrappe
*
* @param key 键
* @param dynamicConfigListener 监听器
* @param ifNotify 是否初始化通知
*/
public void addKeyListener(String key, DynamicConfigListener dynamicConfigListener) {
public void addKeyListener(String key, DynamicConfigListener dynamicConfigListener, boolean ifNotify) {
VersionListenerWrapper versionListenerWrapper = keyListenerMap.get(key);
if (versionListenerWrapper == null) {
versionListenerWrapper = new VersionListenerWrapper();
}
versionListenerWrapper.addListener(dynamicConfigListener);
versionListenerWrapper.addListener(dynamicConfigListener, ifNotify);
keyListenerMap.put(key, versionListenerWrapper);
}

Expand Down Expand Up @@ -222,13 +264,13 @@ static class VersionListenerWrapper {
listeners = new HashSet<>();
}

void addListener(DynamicConfigListener listener) {
listeners.add(new VersionListener(-1L, listener));
void addListener(DynamicConfigListener listener, boolean ifNotify) {
listeners.add(new VersionListener(-1L, listener, ifNotify));
}

boolean removeListener(DynamicConfigListener listener) {
// VersionListener基于listener移除
return listeners.remove(new VersionListener(-1L, listener));
return listeners.remove(new VersionListener(-1L, listener, false));
}
}

Expand All @@ -245,9 +287,25 @@ static class VersionListener {
*/
long version;

VersionListener(long version, DynamicConfigListener listener) {
/**
* 是否被INIT事件通知, 该事件仅通知一次
*/
boolean isInitializer = false;

/**
* 是否需要首次通知
*/
boolean isNeedInit;

/**
* 被初始化的版本
*/
long initVersion;

VersionListener(long version, DynamicConfigListener listener, boolean isNeedInit) {
this.listener = listener;
this.version = version;
this.isNeedInit = isNeedInit;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public EventDataHolder analyzeLatestData(KieResponse response, boolean isFirst)
clear();
}
final Map<String, String> latestData = formatKieResponse(response);
final EventDataHolder eventDataHolder = new EventDataHolder(formatRevision(response.getRevision()));
final EventDataHolder eventDataHolder = new EventDataHolder(formatRevision(response.getRevision()), latestData);
if (currentData != null) {
if (latestData.isEmpty()) {
eventDataHolder.deleted.putAll(currentData);
Expand Down Expand Up @@ -118,6 +118,11 @@ public static class EventDataHolder {
*/
private final long version;

/**
* 最新的全量数据
*/
private final Map<String, String> latestData;

/**
* 修改的key
*/
Expand All @@ -137,12 +142,18 @@ public static class EventDataHolder {
* Constructor.
*
* @param version version
* @param latestData 最新全量数据
*/
public EventDataHolder(long version) {
public EventDataHolder(long version, Map<String, String> latestData) {
modified = new HashMap<String, String>();
deleted = new HashMap<String, String>();
added = new HashMap<String, String>();
this.version = version;
this.latestData = latestData;
}

public Map<String, String> getLatestData() {
return latestData;
}

public Map<String, String> getModified() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ public boolean subscribe(String key, KieRequest kieRequest, DynamicConfigListene
if (oldWrapper == null) {
return firstSubscribeForGroup(key, kieRequest, dynamicConfigListener, ifNotify);
} else {
oldWrapper.addKeyListener(key, dynamicConfigListener);
oldWrapper.addKeyListener(key, dynamicConfigListener, ifNotify);
tryNotify(oldWrapper.getKieRequest(), oldWrapper, ifNotify);
return true;
}
Expand All @@ -301,7 +301,7 @@ private boolean firstSubscribeForGroup(String key, KieRequest kieRequest,
final KieSubscriber kieSubscriber = new KieSubscriber(kieRequest);
Task task;
KieListenerWrapper kieListenerWrapper =
new KieListenerWrapper(key, dynamicConfigListener, new KvDataHolder(), kieRequest);
new KieListenerWrapper(key, dynamicConfigListener, new KvDataHolder(), kieRequest, ifNotify);
if (!kieSubscriber.isLongConnectionRequest()) {
task = new ShortTimerTask(kieSubscriber, kieListenerWrapper);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.huawei.dynamic.config.entity.DynamicConstants;

import com.huaweicloud.sermant.core.common.LoggerFactory;
import com.huaweicloud.sermant.core.service.dynamicconfig.common.DynamicConfigEvent;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
Expand Down Expand Up @@ -61,6 +62,9 @@ public class OriginConfigCenterDisableListener implements BeanFactoryAware {
@Autowired
private Environment environment;

@Autowired
private SpringEventPublisher springEventPublisher;

private BeanFactory beanFactory;

/**
Expand All @@ -84,6 +88,10 @@ public void addListener() {
}
}
tryAddDynamicSourceToFirst(environment);

// 发布刷新事件补偿, 及时刷新禁用后的数据
springEventPublisher.publishRefreshEvent(DynamicConfigEvent.modifyEvent(event.getKey(), event.getGroup(),
event.getContent()));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ private void initDynamicConfig() {

/**
* 发布spring刷新事件{@link org.springframework.cloud.endpoint.event.RefreshEvent}
*
* @param event 事件
*/
private void publishRefreshEvent(DynamicConfigEvent event) {
public void publishRefreshEvent(DynamicConfigEvent event) {
if (event.getEventType() == DynamicConfigEventType.INIT) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public void test() {
pluginConfigManagerMockedStatic.when(() -> PluginConfigManager.getPluginConfig(DynamicConfiguration.class))
.thenReturn(configuration);
final OriginConfigCenterDisableListener originConfigCenterDisableListener = new OriginConfigCenterDisableListener();
ReflectUtils.setFieldValue(originConfigCenterDisableListener, "springEventPublisher",
Mockito.mock(SpringEventPublisher.class));
final BeanFactory beanFactory = Mockito.mock(BeanFactory.class);
originConfigCenterDisableListener.setBeanFactory(beanFactory);
originConfigCenterDisableListener.addListener();
Expand Down

0 comments on commit fd95090

Please sign in to comment.