diff --git a/sermant-plugins/sermant-mq-grayscale/mq-config-common/pom.xml b/sermant-plugins/sermant-mq-grayscale/mq-config-common/pom.xml index 586c56629f..93be013be1 100644 --- a/sermant-plugins/sermant-mq-grayscale/mq-config-common/pom.xml +++ b/sermant-plugins/sermant-mq-grayscale/mq-config-common/pom.xml @@ -14,7 +14,6 @@ 8 8 - service diff --git a/sermant-plugins/sermant-mq-grayscale/mq-config-service/pom.xml b/sermant-plugins/sermant-mq-grayscale/mq-config-service/pom.xml new file mode 100644 index 0000000000..716fee9eef --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-config-service/pom.xml @@ -0,0 +1,47 @@ + + + 4.0.0 + + sermant-mq-grayscale + io.sermant + 1.0.0 + + + mq-config-service + + + 8 + 8 + service + + + + + org.yaml + snakeyaml + + + io.sermant + sermant-agentcore-core + provided + + + io.sermant + mq-config-common + ${project.version} + provided + + + junit + junit + test + + + org.mockito + mockito-core + test + + + \ No newline at end of file diff --git a/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/listener/MqGrayConfigCache.java b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/listener/MqGrayConfigCache.java new file mode 100644 index 0000000000..b03bd186b1 --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/listener/MqGrayConfigCache.java @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.listener; + +import io.sermant.mq.grayscale.config.MqGrayscaleConfig; + +/** + * grayscale config cache + * + * @author chengyouling + * @since 2024-09-12 + **/ +public class MqGrayConfigCache { + private static MqGrayscaleConfig cacheConfig; + + /** + * get cache mqGrayscaleConfig + */ + public static MqGrayscaleConfig getCacheConfig() { + return cacheConfig; + } + + /** + * set cache mqGrayscaleConfig + */ + public static void setCacheConfig(MqGrayscaleConfig config) { + cacheConfig = config; + } + + /** + * clear cache mqGrayscaleConfig + */ + public static void clearCacheConfig() { + cacheConfig = new MqGrayscaleConfig(); + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/config/RocketMqGrayConfigHandler.java b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/listener/MqGrayConfigHandler.java similarity index 86% rename from sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/config/RocketMqGrayConfigHandler.java rename to sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/listener/MqGrayConfigHandler.java index 33d0696459..1e043c0b00 100644 --- a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/config/RocketMqGrayConfigHandler.java +++ b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/listener/MqGrayConfigHandler.java @@ -14,14 +14,13 @@ * limitations under the License. */ -package io.sermant.mq.grayscale.rocketmq.config; +package io.sermant.mq.grayscale.listener; import io.sermant.core.common.LoggerFactory; import io.sermant.core.service.dynamicconfig.common.DynamicConfigEvent; import io.sermant.core.service.dynamicconfig.common.DynamicConfigEventType; import io.sermant.core.utils.StringUtils; import io.sermant.mq.grayscale.config.MqGrayscaleConfig; -import io.sermant.mq.grayscale.rocketmq.utils.RocketMqGrayscaleConfigUtils; import org.yaml.snakeyaml.DumperOptions; import org.yaml.snakeyaml.Yaml; @@ -36,7 +35,7 @@ * @author chengyouling * @since 2024-05-27 **/ -public class RocketMqGrayConfigHandler { +public class MqGrayConfigHandler { private static final Logger LOGGER = LoggerFactory.getLogger(); /** @@ -49,7 +48,7 @@ public class RocketMqGrayConfigHandler { /** * construction */ - public RocketMqGrayConfigHandler() { + public MqGrayConfigHandler() { Representer representer = new Representer(new DumperOptions()); representer.getPropertyUtils().setSkipMissingProperties(true); this.yaml = new Yaml(representer); @@ -65,7 +64,7 @@ public void handle(DynamicConfigEvent event) { return; } if (event.getEventType() == DynamicConfigEventType.DELETE) { - RocketMqGrayscaleConfigUtils.resetGrayscaleConfig(); + MqGrayConfigCache.clearCacheConfig(); return; } if (!StringUtils.isEmpty(event.getContent())) { @@ -73,7 +72,7 @@ public void handle(DynamicConfigEvent event) { event.getGroup(), event.getContent())); MqGrayscaleConfig config = yaml.loadAs(event.getContent(), MqGrayscaleConfig.class); if (config != null) { - RocketMqGrayscaleConfigUtils.setGrayscaleConfig(config, event.getEventType()); + MqGrayConfigCache.setCacheConfig(config); } } } diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/config/RocketMqGrayConfigListener.java b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/listener/MqGrayConfigListener.java similarity index 79% rename from sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/config/RocketMqGrayConfigListener.java rename to sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/listener/MqGrayConfigListener.java index 251f1479cb..0300ddf695 100644 --- a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/config/RocketMqGrayConfigListener.java +++ b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/listener/MqGrayConfigListener.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package io.sermant.mq.grayscale.rocketmq.config; +package io.sermant.mq.grayscale.listener; import io.sermant.core.service.dynamicconfig.common.DynamicConfigEvent; import io.sermant.core.service.dynamicconfig.common.DynamicConfigListener; @@ -25,14 +25,14 @@ * @author chengyouling * @since 2024-05-27 **/ -public class RocketMqGrayConfigListener implements DynamicConfigListener { - private final RocketMqGrayConfigHandler handler; +public class MqGrayConfigListener implements DynamicConfigListener { + private final MqGrayConfigHandler handler; /** * construction */ - public RocketMqGrayConfigListener() { - handler = new RocketMqGrayConfigHandler(); + public MqGrayConfigListener() { + handler = new MqGrayConfigHandler(); } @Override diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/service/RocketMqGrayDynamicConfigService.java b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/service/MqGrayDynamicConfigService.java similarity index 85% rename from sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/service/RocketMqGrayDynamicConfigService.java rename to sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/service/MqGrayDynamicConfigService.java index 04b60dfaed..ed8205777e 100644 --- a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/service/RocketMqGrayDynamicConfigService.java +++ b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/service/MqGrayDynamicConfigService.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package io.sermant.mq.grayscale.rocketmq.service; +package io.sermant.mq.grayscale.service; import io.sermant.core.common.LoggerFactory; import io.sermant.core.config.ConfigManager; @@ -22,7 +22,7 @@ import io.sermant.core.plugin.service.PluginService; import io.sermant.core.plugin.subscribe.CommonGroupConfigSubscriber; import io.sermant.core.plugin.subscribe.ConfigSubscriber; -import io.sermant.mq.grayscale.rocketmq.config.RocketMqGrayConfigListener; +import io.sermant.mq.grayscale.listener.MqGrayConfigListener; import java.util.logging.Logger; @@ -32,13 +32,13 @@ * @author chengyouling * @since 2024-05-27 **/ -public class RocketMqGrayDynamicConfigService implements PluginService { +public class MqGrayDynamicConfigService implements PluginService { private static final Logger LOGGER = LoggerFactory.getLogger(); @Override public void start() { ConfigSubscriber subscriber = new CommonGroupConfigSubscriber( - ConfigManager.getConfig(ServiceMeta.class).getService(), new RocketMqGrayConfigListener(), + ConfigManager.getConfig(ServiceMeta.class).getService(), new MqGrayConfigListener(), "mq-grayscale"); subscriber.subscribe(); LOGGER.info("Success to subscribe mq-grayscale config"); diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.service.PluginService b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/resources/META-INF/services/io.sermant.core.plugin.service.PluginService similarity index 89% rename from sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.service.PluginService rename to sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/resources/META-INF/services/io.sermant.core.plugin.service.PluginService index b84775de79..ef5284eb25 100644 --- a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.service.PluginService +++ b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/resources/META-INF/services/io.sermant.core.plugin.service.PluginService @@ -14,4 +14,4 @@ # limitations under the License. # -io.sermant.mq.grayscale.rocketmq.service.RocketMqGrayDynamicConfigService \ No newline at end of file +io.sermant.mq.grayscale.service.MqGrayDynamicConfigService \ No newline at end of file diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/pom.xml b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/pom.xml index 3f28b253f8..213f302f8f 100644 --- a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/pom.xml +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/pom.xml @@ -20,12 +20,13 @@ - org.yaml - snakeyaml + io.sermant + mq-config-common + ${project.version} io.sermant - mq-config-common + mq-config-service ${project.version} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqAbstractIntercepter.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqAbstractIntercepter.java index 69be412961..de6be26307 100644 --- a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqAbstractIntercepter.java +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqAbstractIntercepter.java @@ -37,6 +37,7 @@ public ExecuteContext before(ExecuteContext context) throws Exception { @Override public ExecuteContext after(ExecuteContext context) throws Exception { + RocketMqGrayscaleConfigUtils.checkAndUpdateCacheGrayConfig(); if (!RocketMqGrayscaleConfigUtils.isPluginEnabled()) { return context; } diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqConsumerConstructorInterceptor.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqConsumerConstructorInterceptor.java index 5cf18bac9c..0141047aa6 100644 --- a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqConsumerConstructorInterceptor.java +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqConsumerConstructorInterceptor.java @@ -19,7 +19,11 @@ import io.sermant.core.plugin.agent.entity.ExecuteContext; import io.sermant.core.plugin.agent.interceptor.AbstractInterceptor; import io.sermant.core.utils.StringUtils; +import io.sermant.mq.grayscale.listener.MqGrayConfigCache; import io.sermant.mq.grayscale.rocketmq.utils.RocketMqGrayscaleConfigUtils; +import io.sermant.mq.grayscale.rocketmq.utils.RocketMqSubscriptionDataUtils; + +import java.util.concurrent.atomic.AtomicBoolean; /** * DefaultMQPushConsumer/DefaultLitePullConsumer/DefaultMQPullConsumer Constructor method interceptor @@ -29,8 +33,13 @@ * @since 2024-05-27 **/ public class RocketMqConsumerConstructorInterceptor extends AbstractInterceptor { + private final AtomicBoolean cacheConfigInit = new AtomicBoolean(false); + @Override public ExecuteContext before(ExecuteContext context) throws Exception { + if (cacheConfigInit.compareAndSet(false, true)) { + RocketMqGrayscaleConfigUtils.createMqGrayConfigs(); + } if (!RocketMqGrayscaleConfigUtils.isPluginEnabled()) { return context; } diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPullConsumerSubscriptionUpdateInterceptor.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPullConsumerSubscriptionUpdateInterceptor.java index d65f9faee2..a174104284 100644 --- a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPullConsumerSubscriptionUpdateInterceptor.java +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPullConsumerSubscriptionUpdateInterceptor.java @@ -67,6 +67,7 @@ private void buildSql92SubscriptionData(ExecuteContext context, SubscriptionData if (StringUtils.isEmpty(RocketMqGrayscaleConfigUtils.getGrayGroupTag())) { RocketMqConsumerGroupAutoCheck.setMqClientInstance(subscriptionData.getTopic(), consumerGroup, instance); } + RocketMqConsumerGroupAutoCheck.queryGrayConsumerGroup(); String subscribeScope = RocketMqSubscriptionDataUtils.buildSubscribeScope(subscriptionData.getTopic(), consumerGroup, namesrvAddr); RocketMqSubscriptionDataUtils.resetsSql92SubscriptionData(subscriptionData, subscribeScope); diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqSchedulerRebuildSubscriptionInterceptor.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqSchedulerRebuildSubscriptionInterceptor.java index 6a7d29514a..4de5affe2a 100644 --- a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqSchedulerRebuildSubscriptionInterceptor.java +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqSchedulerRebuildSubscriptionInterceptor.java @@ -64,6 +64,8 @@ private void buildSql92SubscriptionData(SubscriptionData subscriptionData, Rebal if (StringUtils.isEmpty(RocketMqGrayscaleConfigUtils.getGrayGroupTag())) { RocketMqConsumerGroupAutoCheck.setMqClientInstance(topic, consumerGroup, instance); } + RocketMqConsumerGroupAutoCheck.queryGrayConsumerGroup(); + RocketMqConsumerGroupAutoCheck.startSchedulerCheckGroupTask(); String namesrvAddr = balance.getmQClientFactory().getClientConfig().getNamesrvAddr(); String subscribeScope = RocketMqSubscriptionDataUtils.buildSubscribeScope(topic, consumerGroup, namesrvAddr); diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/service/RocketMqConsumerGroupAutoCheck.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/service/RocketMqConsumerGroupAutoCheck.java index 00a896a38b..1937a5945f 100644 --- a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/service/RocketMqConsumerGroupAutoCheck.java +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/service/RocketMqConsumerGroupAutoCheck.java @@ -98,23 +98,35 @@ public static void setMqClientInstance(String topic, String consumerGroup, MQCli if (currentConfig != null && currentConfig.getMqClientInstance() == null) { currentConfig.setMqClientInstance(mqClientInstance); } - autoModeQueryGrayGroupTags(); } - private static void autoModeQueryGrayGroupTags() { - if (RocketMqGrayscaleConfigUtils.getConsumeType() == ConsumeModeEnum.AUTO) { - // sync to obtain current gray consumer group at service start - schedulerCheck(); + /** + * query gray consumer group + */ + public static void queryGrayConsumerGroup() { + if (RocketMqGrayscaleConfigUtils.getConsumeType() == ConsumeModeEnum.AUTO && !START_AUTO_CHECK.get()) { + // sync to obtain current gray consumer group at AUTO mode before start scheduler check group + findGrayConsumerGroupAndUpdateGrayTags(); + } + } - // async to check gray consumer group + /** + * start scheduler check gray consumer is changed + */ + public static void startSchedulerCheckGroupTask() { + if (RocketMqGrayscaleConfigUtils.getConsumeType() == ConsumeModeEnum.AUTO) { if (START_AUTO_CHECK.compareAndSet(false, true)) { - EXECUTOR_SERVICE.scheduleWithFixedDelay(RocketMqConsumerGroupAutoCheck::schedulerCheck, INITIAL_DELAY, + EXECUTOR_SERVICE.scheduleWithFixedDelay( + RocketMqConsumerGroupAutoCheck::findGrayConsumerGroupAndUpdateGrayTags, INITIAL_DELAY, RocketMqGrayscaleConfigUtils.getAutoCheckDelayTime(), TimeUnit.SECONDS); } } } - private static void schedulerCheck() { + /** + * find gray consumer group and update gray tags + */ + public static void findGrayConsumerGroupAndUpdateGrayTags() { if (CONSUMER_CLIENT_CONFIG_MAP.isEmpty()) { return; } @@ -132,7 +144,9 @@ private static void schedulerCheck() { if (clientConfig.getMqClientInstance() == null) { continue; } - findGrayConsumerGroup(clientConfig); + Set grayTags = findGrayConsumerGroupAndGetTags(clientConfig); + LOGGER.log(Level.INFO,"[auto-check] current find gray tags: {0}.", grayTags); + resetAutoCheckGrayTagItems(grayTags, clientConfig); } } @@ -140,44 +154,59 @@ private static void schedulerCheck() { * querying all consumer groups of Topic and Collecting grayGroupTag * * @param clientConfig clientConfig + * @return grayTags */ - private static void findGrayConsumerGroup(RocketMqConsumerClientConfig clientConfig) { + private static Set findGrayConsumerGroupAndGetTags(RocketMqConsumerClientConfig clientConfig) { try { MQClientAPIImpl mqClientApi = clientConfig.getMqClientInstance().getMQClientAPIImpl(); - TopicRouteData topicRouteData = mqClientApi.getTopicRouteInfoFromNameServer(clientConfig.getTopic(), - ROCKET_MQ_READ_TIMEOUT, false); - List brokerList = new ArrayList<>(); - for (BrokerData brokerData : topicRouteData.getBrokerDatas()) { - brokerList.addAll(brokerData.getBrokerAddrs().values()); - } - String brokerAddress = brokerList.get(0); - Set grayTags = new HashSet<>(); + String brokerAddress = getBrokerAddress(clientConfig.getTopic(), mqClientApi); GroupList groupList = mqClientApi.queryTopicConsumeByWho(brokerAddress, clientConfig.getTopic(), ROCKET_MQ_READ_TIMEOUT); - for (String group : groupList.getGroupList()) { - try { - List consumerIds = mqClientApi.getConsumerIdListByGroup(brokerAddress, group, - ROCKET_MQ_READ_TIMEOUT); - if (consumerIds.isEmpty()) { - continue; - } - String grayTag = StringUtils.substringAfterLast(group, clientConfig.getConsumerGroup() + "_"); - if (!StringUtils.isEmpty(grayTag)) { - grayTags.add(grayTag); - } - } catch (RemotingConnectException | RemotingSendRequestException | RemotingTimeoutException - | MQBrokerException | InterruptedException e) { - LOGGER.warning(String.format(Locale.ENGLISH, "[auto-check] can not find ids in group: [%s].", - group)); - } - } - LOGGER.log(Level.INFO,"[auto-check] current find gray tags: {0}.", grayTags); - resetAutoCheckGrayTagItems(grayTags, clientConfig); + return getGrayTagsByConsumerGroup(groupList, brokerAddress, mqClientApi, + clientConfig.getConsumerGroup()); } catch (MQClientException | InterruptedException | RemotingTimeoutException | RemotingSendRequestException | RemotingConnectException | MQBrokerException e) { LOGGER.log(Level.FINE, String.format(Locale.ENGLISH, "[auto-check] error, message: %s", e.getMessage()), e); } + return new HashSet<>(); + } + + private static Set getGrayTagsByConsumerGroup(GroupList groupList, String brokerAddress, + MQClientAPIImpl mqClientApi, String consumerGroup) { + Set grayTags = new HashSet<>(); + for (String group : groupList.getGroupList()) { + try { + List consumerIds = mqClientApi.getConsumerIdListByGroup(brokerAddress, group, + ROCKET_MQ_READ_TIMEOUT); + if (consumerIds.isEmpty()) { + continue; + } + String grayTag = StringUtils.substringAfterLast(group, consumerGroup + "_"); + if (!StringUtils.isEmpty(grayTag)) { + grayTags.add(grayTag); + } + } catch (RemotingConnectException | RemotingSendRequestException | RemotingTimeoutException + | MQBrokerException | InterruptedException e) { + LOGGER.warning(String.format(Locale.ENGLISH, "[auto-check] can not find ids in group: [%s].", + group)); + } + } + return grayTags; + } + + private static String getBrokerAddress(String topic, MQClientAPIImpl mqClientApi) + throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, + InterruptedException, MQClientException { + TopicRouteData topicRouteData = mqClientApi.getTopicRouteInfoFromNameServer(topic, ROCKET_MQ_READ_TIMEOUT, + false); + List brokerList = new ArrayList<>(); + for (BrokerData brokerData : topicRouteData.getBrokerDatas()) { + brokerList.addAll(brokerData.getBrokerAddrs().values()); + } + + // cluster mode has multiple addresses, just select one + return brokerList.get(0); } /** @@ -196,13 +225,8 @@ private static void resetAutoCheckGrayTagItems(Set grayTags, RocketMqCon } return; } - HashSet currentGroups = new HashSet<>(grayTags); - Set lastTags = LAST_TOPIC_GROUP_GRAY_TAG.get(subscribeScope); - if (LAST_TOPIC_GROUP_GRAY_TAG.containsKey(subscribeScope)) { - currentGroups.removeAll(lastTags); - } - List grayTagItems = new ArrayList<>(); - if (!currentGroups.isEmpty() || grayTags.size() != lastTags.size()) { + if (isGrayTagsChanged(grayTags, subscribeScope)) { + List grayTagItems = new ArrayList<>(); for (String grayTag : grayTags) { Optional item = RocketMqGrayscaleConfigUtils.getGrayscaleConfigs().getGrayTagByGroupTag(grayTag); @@ -213,6 +237,15 @@ private static void resetAutoCheckGrayTagItems(Set grayTags, RocketMqCon } } + private static boolean isGrayTagsChanged(Set grayTags, String subscribeScope) { + HashSet currentGroups = new HashSet<>(grayTags); + Set lastTags = LAST_TOPIC_GROUP_GRAY_TAG.get(subscribeScope); + if (LAST_TOPIC_GROUP_GRAY_TAG.containsKey(subscribeScope)) { + currentGroups.removeAll(lastTags); + } + return !currentGroups.isEmpty() || grayTags.size() != lastTags.size(); + } + /** * set consumer client config * diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/utils/RocketMqGrayscaleConfigUtils.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/utils/RocketMqGrayscaleConfigUtils.java index 5d034a68ac..ddce5cc4f0 100644 --- a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/utils/RocketMqGrayscaleConfigUtils.java +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/utils/RocketMqGrayscaleConfigUtils.java @@ -19,10 +19,10 @@ import io.sermant.core.config.ConfigManager; import io.sermant.core.plugin.config.PluginConfigManager; import io.sermant.core.plugin.config.ServiceMeta; -import io.sermant.core.service.dynamicconfig.common.DynamicConfigEventType; import io.sermant.mq.grayscale.config.ConsumeModeEnum; import io.sermant.mq.grayscale.config.GrayTagItem; import io.sermant.mq.grayscale.config.MqGrayscaleConfig; +import io.sermant.mq.grayscale.listener.MqGrayConfigCache; import org.apache.rocketmq.common.message.Message; @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; /** @@ -113,33 +114,6 @@ public static String standardFormatGroupTag(String grayGroupTag) { return PATTERN.matcher(grayGroupTag.toLowerCase(Locale.ROOT)).replaceAll("-"); } - /** - * reset cache mqGrayscaleConfig - */ - public static void resetGrayscaleConfig() { - cacheConfig = new MqGrayscaleConfig(); - } - - /** - * set/update cache mqGrayscaleConfig - * - * @param config config - * @param eventType eventType - */ - public static void setGrayscaleConfig(MqGrayscaleConfig config, DynamicConfigEventType eventType) { - buildGrayTagsSet(config); - if (eventType == DynamicConfigEventType.CREATE) { - cacheConfig = config; - RocketMqSubscriptionDataUtils.updateChangeFlag(); - return; - } - boolean isAllowRefresh = isAllowRefreshChangeFlag(cacheConfig, config); - if (isAllowRefresh) { - cacheConfig.updateGrayscaleConfig(config); - RocketMqSubscriptionDataUtils.updateChangeFlag(); - } - } - private static void buildGrayTagsSet(MqGrayscaleConfig config) { for (GrayTagItem item : config.getGrayscale()) { if (!item.getTrafficTag().isEmpty()) { @@ -179,6 +153,33 @@ public static boolean isPluginEnabled() { return cacheConfig.isEnabled(); } + /** + * create gray configs from config center + */ + public static void createMqGrayConfigs() { + if (MqGrayConfigCache.getCacheConfig() == null) { + return; + } + cacheConfig = MqGrayConfigCache.getCacheConfig(); + RocketMqSubscriptionDataUtils.updateChangeFlag(); + buildGrayTagsSet(MqGrayConfigCache.getCacheConfig()); + } + + /** + * check config is changed and update gray configs + */ + public static void checkAndUpdateCacheGrayConfig() { + if (MqGrayConfigCache.getCacheConfig() == null) { + return; + } + boolean isAllowRefresh = isAllowRefreshChangeFlag(cacheConfig, MqGrayConfigCache.getCacheConfig()); + if (isAllowRefresh) { + cacheConfig.updateGrayscaleConfig(MqGrayConfigCache.getCacheConfig()); + RocketMqSubscriptionDataUtils.updateChangeFlag(); + buildGrayTagsSet(MqGrayConfigCache.getCacheConfig()); + } + } + /** * compare serviceMeta with mqGrayscaleConfig, set message property * diff --git a/sermant-plugins/sermant-mq-grayscale/pom.xml b/sermant-plugins/sermant-mq-grayscale/pom.xml index 251e49f8db..503066a637 100644 --- a/sermant-plugins/sermant-mq-grayscale/pom.xml +++ b/sermant-plugins/sermant-mq-grayscale/pom.xml @@ -43,6 +43,7 @@ mq-grayscale-rocketmq-plugin mq-config-common + mq-config-service @@ -50,6 +51,7 @@ mq-grayscale-rocketmq-plugin mq-config-common + mq-config-service @@ -57,6 +59,7 @@ mq-grayscale-rocketmq-plugin mq-config-common + mq-config-service