diff --git a/sermant-plugins/sermant-mq-grayscale/mq-config-common/pom.xml b/sermant-plugins/sermant-mq-grayscale/mq-config-common/pom.xml
new file mode 100644
index 0000000000..49783ded82
--- /dev/null
+++ b/sermant-plugins/sermant-mq-grayscale/mq-config-common/pom.xml
@@ -0,0 +1,37 @@
+
+
+ 4.0.0
+
+ sermant-mq-grayscale
+ io.sermant
+ 1.0.0
+
+
+ mq-config-common
+ jar
+
+
+ 8
+ 8
+
+
+
+
+ io.sermant
+ sermant-agentcore-core
+ provided
+
+
+ junit
+ junit
+ test
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+
\ No newline at end of file
diff --git a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/BaseMessage.java b/sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/BaseMessage.java
similarity index 100%
rename from sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/BaseMessage.java
rename to sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/BaseMessage.java
diff --git a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/ConsumeModeEnum.java b/sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/ConsumeModeEnum.java
similarity index 100%
rename from sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/ConsumeModeEnum.java
rename to sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/ConsumeModeEnum.java
diff --git a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/GrayTagItem.java b/sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/GrayTagItem.java
similarity index 100%
rename from sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/GrayTagItem.java
rename to sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/GrayTagItem.java
diff --git a/sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/MqGrayConfigCache.java b/sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/MqGrayConfigCache.java
new file mode 100644
index 0000000000..0e4e224303
--- /dev/null
+++ b/sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/MqGrayConfigCache.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.sermant.mq.grayscale.config;
+
+import io.sermant.core.plugin.config.PluginConfigManager;
+import io.sermant.core.service.dynamicconfig.common.DynamicConfigEventType;
+import io.sermant.mq.grayscale.config.rocketmq.RocketMqConfigUtils;
+
+/**
+ * grayscale config cache
+ *
+ * @author chengyouling
+ * @since 2024-09-12
+ **/
+public class MqGrayConfigCache {
+ private static MqGrayscaleConfig cacheConfig = PluginConfigManager.getPluginConfig(MqGrayscaleConfig.class);
+
+ private MqGrayConfigCache() {
+ }
+
+ /**
+ * get cache mqGrayscaleConfig
+ *
+ * @return mqGrayscaleConfig
+ */
+ public static MqGrayscaleConfig getCacheConfig() {
+ return cacheConfig;
+ }
+
+ /**
+ * set cache mqGrayscaleConfig
+ *
+ * @param config mqGrayscaleConfig
+ * @param eventType eventType
+ */
+ public static void setCacheConfig(MqGrayscaleConfig config, DynamicConfigEventType eventType) {
+ if (eventType == DynamicConfigEventType.CREATE) {
+ cacheConfig = config;
+ RocketMqConfigUtils.updateChangeFlag();
+ RocketMqConfigUtils.recordTrafficTagsSet(config);
+ return;
+ }
+ boolean isAllowRefresh = isAllowRefreshChangeFlag(cacheConfig, config);
+ if (isAllowRefresh) {
+ cacheConfig.updateGrayscaleConfig(MqGrayConfigCache.getCacheConfig());
+ RocketMqConfigUtils.updateChangeFlag();
+ RocketMqConfigUtils.recordTrafficTagsSet(config);
+ }
+ }
+
+ /**
+ * clear cache mqGrayscaleConfig
+ */
+ public static void clearCacheConfig() {
+ cacheConfig = new MqGrayscaleConfig();
+ RocketMqConfigUtils.updateChangeFlag();
+ }
+
+ /**
+ * only traffic label changes allow refresh tag change map to rebuild SQL92 query statement,
+ * because if the serviceMeta changed, the gray consumer cannot be matched and becomes a base consumer
+ * so, if you need to change the env tag, restart all services.
+ *
+ * @param resource cache config
+ * @param target cache config
+ * @return boolean
+ */
+ private static boolean isAllowRefreshChangeFlag(MqGrayscaleConfig resource, MqGrayscaleConfig target) {
+ if (resource.isEnabled() != target.isEnabled()) {
+ return true;
+ }
+ if (resource.isBaseExcludeGroupTagsChanged(target)) {
+ return true;
+ }
+ if (resource.isConsumerModeChanged(target)) {
+ return true;
+ }
+ return !resource.buildAllTrafficTagInfoToStr().equals(target.buildAllTrafficTagInfoToStr());
+ }
+}
diff --git a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqGrayscaleConfig.java b/sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/MqGrayscaleConfig.java
similarity index 96%
rename from sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqGrayscaleConfig.java
rename to sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/MqGrayscaleConfig.java
index 0eb33642ed..f2bd3dba00 100644
--- a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqGrayscaleConfig.java
+++ b/sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/MqGrayscaleConfig.java
@@ -18,7 +18,6 @@
import io.sermant.core.config.common.ConfigTypeKey;
import io.sermant.core.plugin.config.PluginConfig;
-import io.sermant.mq.grayscale.utils.SubscriptionDataUtils;
import java.util.ArrayList;
import java.util.HashMap;
@@ -35,6 +34,11 @@
**/
@ConfigTypeKey("grayscale.mq.config")
public class MqGrayscaleConfig implements PluginConfig {
+ /**
+ * afa symbol
+ */
+ private static final String AFA_SYMBOL = "@";
+
private boolean enabled = false;
private List grayscale = new ArrayList<>();
@@ -122,12 +126,12 @@ public String buildAllTrafficTagInfoToStr() {
StringBuilder sb = new StringBuilder();
for (GrayTagItem item : grayscale) {
if (sb.length() > 0) {
- sb.append(SubscriptionDataUtils.AFA_SYMBOL);
+ sb.append(AFA_SYMBOL);
}
sb.append(item.getConsumerGroupTag());
for (Map.Entry entry : item.getTrafficTag().entrySet()) {
sb.append(entry.getKey())
- .append(SubscriptionDataUtils.AFA_SYMBOL)
+ .append(AFA_SYMBOL)
.append(entry.getValue());
}
}
diff --git a/sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/rocketmq/RocketMqConfigUtils.java b/sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/rocketmq/RocketMqConfigUtils.java
new file mode 100644
index 0000000000..c7ad5ddf7d
--- /dev/null
+++ b/sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/java/io/sermant/mq/grayscale/config/rocketmq/RocketMqConfigUtils.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.sermant.mq.grayscale.config.rocketmq;
+
+import io.sermant.mq.grayscale.config.GrayTagItem;
+import io.sermant.mq.grayscale.config.MqGrayscaleConfig;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * rocketmq config utils
+ *
+ * @author chengyouling
+ * @since 2024-09-13
+ */
+public class RocketMqConfigUtils {
+ /**
+ * base instance subscript gray tag change flags
+ * key: namesrvAddr@topic@consumerGroup
+ * value: change flag
+ */
+ private static final Map BASE_GROUP_TAG_CHANGE_MAP = new ConcurrentHashMap<>();
+
+ /**
+ * gray instance subscript gray tag change flags
+ * key: namesrvAddr@topic@consumerGroup
+ * value: change flag
+ */
+ private static final Map GRAY_GROUP_TAG_CHANGE_MAP = new ConcurrentHashMap<>();
+
+ /**
+ * all traffic tags that's been set, using for sql92 expression reset
+ */
+ private static final Set GRAY_TAGS_SET = new HashSet<>();
+
+ private RocketMqConfigUtils() {
+ }
+
+ /**
+ * set base consumer address@topic@group correspondents change flag
+ *
+ * @param subscribeScope subscribeScope
+ * @param flag flag
+ */
+ public static void setBaseGroupTagChangeMap(String subscribeScope, boolean flag) {
+ BASE_GROUP_TAG_CHANGE_MAP.put(subscribeScope, flag);
+ }
+
+ /**
+ * set gray consumer address@topic@group correspondents change flag
+ *
+ * @param subscribeScope subscribeScope
+ * @param flag flag
+ */
+ public static void setGrayGroupTagChangeMap(String subscribeScope, boolean flag) {
+ GRAY_GROUP_TAG_CHANGE_MAP.put(subscribeScope, flag);
+ }
+
+ /**
+ * get base consumer address@topic@group correspondents change flag
+ *
+ * @param subscribeScope subscribeScope
+ * @return changeFlag
+ */
+ public static boolean getBaseGroupTagChangeMap(String subscribeScope) {
+ return BASE_GROUP_TAG_CHANGE_MAP.get(subscribeScope) != null && BASE_GROUP_TAG_CHANGE_MAP.get(subscribeScope);
+ }
+
+ /**
+ * get gray consumer address@topic@group correspondents change flag
+ *
+ * @param subscribeScope subscribeScope
+ * @return changeFlag
+ */
+ public static boolean getGrayGroupTagChangeMap(String subscribeScope) {
+ return GRAY_GROUP_TAG_CHANGE_MAP.get(subscribeScope) != null && GRAY_GROUP_TAG_CHANGE_MAP.get(subscribeScope);
+ }
+
+ /**
+ * update all consumer gray tag change flag
+ */
+ public static void updateChangeFlag() {
+ BASE_GROUP_TAG_CHANGE_MAP.replaceAll((k, v) -> true);
+ GRAY_GROUP_TAG_CHANGE_MAP.replaceAll((k, v) -> true);
+ }
+
+ /**
+ * records traffic labels for historical and current configuration settings
+ *
+ * @param config config
+ */
+ public static void recordTrafficTagsSet(MqGrayscaleConfig config) {
+ for (GrayTagItem item : config.getGrayscale()) {
+ if (!item.getTrafficTag().isEmpty()) {
+ GRAY_TAGS_SET.addAll(item.getTrafficTag().keySet());
+ }
+ }
+ }
+
+ /**
+ * get all config set gray tags
+ *
+ * @return all config set gray tags
+ */
+ public static Set getGrayTagsSet() {
+ return GRAY_TAGS_SET;
+ }
+}
diff --git a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.config.PluginConfig b/sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/resources/META-INF/services/io.sermant.core.plugin.config.PluginConfig
similarity index 100%
rename from sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.config.PluginConfig
rename to sermant-plugins/sermant-mq-grayscale/mq-config-common/src/main/resources/META-INF/services/io.sermant.core.plugin.config.PluginConfig
diff --git a/sermant-plugins/sermant-mq-grayscale/mq-config-service/pom.xml b/sermant-plugins/sermant-mq-grayscale/mq-config-service/pom.xml
new file mode 100644
index 0000000000..716fee9eef
--- /dev/null
+++ b/sermant-plugins/sermant-mq-grayscale/mq-config-service/pom.xml
@@ -0,0 +1,47 @@
+
+
+ 4.0.0
+
+ sermant-mq-grayscale
+ io.sermant
+ 1.0.0
+
+
+ mq-config-service
+
+
+ 8
+ 8
+ service
+
+
+
+
+ org.yaml
+ snakeyaml
+
+
+ io.sermant
+ sermant-agentcore-core
+ provided
+
+
+ io.sermant
+ mq-config-common
+ ${project.version}
+ provided
+
+
+ junit
+ junit
+ test
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+
\ No newline at end of file
diff --git a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqGrayConfigHandler.java b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/listener/MqGrayConfigHandler.java
similarity index 89%
rename from sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqGrayConfigHandler.java
rename to sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/listener/MqGrayConfigHandler.java
index 413ec90b59..90cc799b03 100644
--- a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqGrayConfigHandler.java
+++ b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/listener/MqGrayConfigHandler.java
@@ -14,13 +14,14 @@
* limitations under the License.
*/
-package io.sermant.mq.grayscale.config;
+package io.sermant.mq.grayscale.listener;
import io.sermant.core.common.LoggerFactory;
import io.sermant.core.service.dynamicconfig.common.DynamicConfigEvent;
import io.sermant.core.service.dynamicconfig.common.DynamicConfigEventType;
import io.sermant.core.utils.StringUtils;
-import io.sermant.mq.grayscale.utils.MqGrayscaleConfigUtils;
+import io.sermant.mq.grayscale.config.MqGrayConfigCache;
+import io.sermant.mq.grayscale.config.MqGrayscaleConfig;
import org.yaml.snakeyaml.DumperOptions;
import org.yaml.snakeyaml.Yaml;
@@ -64,7 +65,7 @@ public void handle(DynamicConfigEvent event) {
return;
}
if (event.getEventType() == DynamicConfigEventType.DELETE) {
- MqGrayscaleConfigUtils.resetGrayscaleConfig();
+ MqGrayConfigCache.clearCacheConfig();
return;
}
if (!StringUtils.isEmpty(event.getContent())) {
@@ -72,7 +73,7 @@ public void handle(DynamicConfigEvent event) {
event.getGroup(), event.getContent()));
MqGrayscaleConfig config = yaml.loadAs(event.getContent(), MqGrayscaleConfig.class);
if (config != null) {
- MqGrayscaleConfigUtils.setGrayscaleConfig(config, event.getEventType());
+ MqGrayConfigCache.setCacheConfig(config, event.getEventType());
}
}
}
diff --git a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqGrayConfigListener.java b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/listener/MqGrayConfigListener.java
similarity index 96%
rename from sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqGrayConfigListener.java
rename to sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/listener/MqGrayConfigListener.java
index 3ec70e1b11..0300ddf695 100644
--- a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqGrayConfigListener.java
+++ b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/listener/MqGrayConfigListener.java
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package io.sermant.mq.grayscale.config;
+package io.sermant.mq.grayscale.listener;
import io.sermant.core.service.dynamicconfig.common.DynamicConfigEvent;
import io.sermant.core.service.dynamicconfig.common.DynamicConfigListener;
diff --git a/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/service/MqGrayDynamicConfigService.java b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/service/MqGrayDynamicConfigService.java
new file mode 100644
index 0000000000..ed8205777e
--- /dev/null
+++ b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/java/io/sermant/mq/grayscale/service/MqGrayDynamicConfigService.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.sermant.mq.grayscale.service;
+
+import io.sermant.core.common.LoggerFactory;
+import io.sermant.core.config.ConfigManager;
+import io.sermant.core.plugin.config.ServiceMeta;
+import io.sermant.core.plugin.service.PluginService;
+import io.sermant.core.plugin.subscribe.CommonGroupConfigSubscriber;
+import io.sermant.core.plugin.subscribe.ConfigSubscriber;
+import io.sermant.mq.grayscale.listener.MqGrayConfigListener;
+
+import java.util.logging.Logger;
+
+/**
+ * grayscale dynamic config service
+ *
+ * @author chengyouling
+ * @since 2024-05-27
+ **/
+public class MqGrayDynamicConfigService implements PluginService {
+ private static final Logger LOGGER = LoggerFactory.getLogger();
+
+ @Override
+ public void start() {
+ ConfigSubscriber subscriber = new CommonGroupConfigSubscriber(
+ ConfigManager.getConfig(ServiceMeta.class).getService(), new MqGrayConfigListener(),
+ "mq-grayscale");
+ subscriber.subscribe();
+ LOGGER.info("Success to subscribe mq-grayscale config");
+ }
+}
diff --git a/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/resources/META-INF/services/io.sermant.core.plugin.service.PluginService b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/resources/META-INF/services/io.sermant.core.plugin.service.PluginService
new file mode 100644
index 0000000000..ef5284eb25
--- /dev/null
+++ b/sermant-plugins/sermant-mq-grayscale/mq-config-service/src/main/resources/META-INF/services/io.sermant.core.plugin.service.PluginService
@@ -0,0 +1,17 @@
+#
+# Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+io.sermant.mq.grayscale.service.MqGrayDynamicConfigService
\ No newline at end of file
diff --git a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/pom.xml b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/pom.xml
similarity index 90%
rename from sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/pom.xml
rename to sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/pom.xml
index 8bf04ab08b..b6710a633f 100644
--- a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/pom.xml
+++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/pom.xml
@@ -9,7 +9,7 @@
1.0.0
- sermant-mq-grayscale-plugin
+ mq-grayscale-rocketmq-plugin8
@@ -20,12 +20,9 @@
- com.alibaba
- fastjson
-
-
- org.yaml
- snakeyaml
+ io.sermant
+ mq-config-common
+ ${project.version}io.sermant
@@ -52,6 +49,11 @@
mockito-inlinetest
+
+ com.alibaba
+ fastjson
+ test
+
diff --git a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqConsumerClientConfig.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/config/RocketMqConsumerClientConfig.java
similarity index 90%
rename from sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqConsumerClientConfig.java
rename to sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/config/RocketMqConsumerClientConfig.java
index 4079594203..2a39d75ec5 100644
--- a/sermant-plugins/sermant-mq-grayscale/sermant-mq-grayscale-plugin/src/main/java/io/sermant/mq/grayscale/config/MqConsumerClientConfig.java
+++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/config/RocketMqConsumerClientConfig.java
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package io.sermant.mq.grayscale.config;
+package io.sermant.mq.grayscale.rocketmq.config;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
@@ -24,7 +24,7 @@
* @author chengyouling
* @since 2024-05-27
**/
-public class MqConsumerClientConfig {
+public class RocketMqConsumerClientConfig {
private String topic;
private String address;
@@ -40,7 +40,7 @@ public class MqConsumerClientConfig {
* @param topic topic
* @param consumerGroup consumerGroup
*/
- public MqConsumerClientConfig(String address, String topic, String consumerGroup) {
+ public RocketMqConsumerClientConfig(String address, String topic, String consumerGroup) {
this.address = address;
this.topic = topic;
this.consumerGroup = consumerGroup;
diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqLitePullConsumerConstructorDeclarer.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqLitePullConsumerConstructorDeclarer.java
new file mode 100644
index 0000000000..1d3c8d6419
--- /dev/null
+++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqLitePullConsumerConstructorDeclarer.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.sermant.mq.grayscale.rocketmq.declarer;
+
+import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
+import io.sermant.core.plugin.agent.declarer.InterceptDeclarer;
+import io.sermant.core.plugin.agent.matcher.ClassMatcher;
+import io.sermant.core.plugin.agent.matcher.MethodMatcher;
+import io.sermant.mq.grayscale.rocketmq.interceptor.RocketMqConsumerConstructorInterceptor;
+
+/**
+ * lite pull consumer set gray consumer group declarer
+ *
+ * @author chengyouling
+ * @since 2024-09-07
+ **/
+public class RocketMqLitePullConsumerConstructorDeclarer extends AbstractPluginDeclarer {
+ private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.consumer.DefaultLitePullConsumer";
+
+ private static final String[] METHOD_PARAM_TYPES = {
+ "java.lang.String",
+ "java.lang.String",
+ "org.apache.rocketmq.remoting.RPCHook"
+ };
+
+ @Override
+ public ClassMatcher getClassMatcher() {
+ return ClassMatcher.nameEquals(ENHANCE_CLASS);
+ }
+
+ @Override
+ public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
+ return new InterceptDeclarer[]{
+ InterceptDeclarer.build(MethodMatcher.isConstructor()
+ .and(MethodMatcher.paramTypesEqual(METHOD_PARAM_TYPES)),
+ new RocketMqConsumerConstructorInterceptor())
+ };
+ }
+}
diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqLitePullConsumerSubscribeDeclarer.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqLitePullConsumerSubscribeDeclarer.java
new file mode 100644
index 0000000000..60090c820f
--- /dev/null
+++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqLitePullConsumerSubscribeDeclarer.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.sermant.mq.grayscale.rocketmq.declarer;
+
+import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
+import io.sermant.core.plugin.agent.declarer.InterceptDeclarer;
+import io.sermant.core.plugin.agent.matcher.ClassMatcher;
+import io.sermant.core.plugin.agent.matcher.MethodMatcher;
+import io.sermant.mq.grayscale.rocketmq.interceptor.RocketMqLitePullConsumerSubscribeInterceptor;
+
+/**
+ * lite pull consumer build consumer client config declarer
+ *
+ * @author chengyouling
+ * @since 2024-05-27
+ **/
+public class RocketMqLitePullConsumerSubscribeDeclarer extends AbstractPluginDeclarer {
+ private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl";
+
+ private static final String METHOD_SUBSCRIBE = "subscribe";
+
+ private static final String METHOD_FETCH = "fetchMessageQueues";
+
+ @Override
+ public ClassMatcher getClassMatcher() {
+ return ClassMatcher.nameEquals(ENHANCE_CLASS);
+ }
+
+ @Override
+ public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
+ return new InterceptDeclarer[]{
+ InterceptDeclarer.build(
+ MethodMatcher.nameContains(METHOD_SUBSCRIBE, METHOD_FETCH),
+ new RocketMqLitePullConsumerSubscribeInterceptor())
+ };
+ }
+}
diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqProducerGrayMessageHookDeclarer.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqProducerGrayMessageHookDeclarer.java
new file mode 100644
index 0000000000..166411b68f
--- /dev/null
+++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqProducerGrayMessageHookDeclarer.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.sermant.mq.grayscale.rocketmq.declarer;
+
+import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
+import io.sermant.core.plugin.agent.declarer.InterceptDeclarer;
+import io.sermant.core.plugin.agent.matcher.ClassMatcher;
+import io.sermant.core.plugin.agent.matcher.MethodMatcher;
+import io.sermant.mq.grayscale.rocketmq.interceptor.RocketMqProducerGrayMessageHookInterceptor;
+
+/**
+ * SendMessageHook builder declarer
+ *
+ * @author chengyouling
+ * @since 2024-05-27
+ **/
+public class RocketMqProducerGrayMessageHookDeclarer extends AbstractPluginDeclarer {
+ private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl";
+
+ private static final String[] METHOD_PARAM_TYPES = {
+ "org.apache.rocketmq.client.producer.DefaultMQProducer",
+ "org.apache.rocketmq.remoting.RPCHook"
+ };
+
+ @Override
+ public ClassMatcher getClassMatcher() {
+ return ClassMatcher.nameEquals(ENHANCE_CLASS);
+ }
+
+ @Override
+ public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
+ return new InterceptDeclarer[]{
+ InterceptDeclarer.build(MethodMatcher.isConstructor()
+ .and(MethodMatcher.paramTypesEqual(METHOD_PARAM_TYPES)),
+ new RocketMqProducerGrayMessageHookInterceptor())
+ };
+ }
+}
diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPullConsumerConstructorDeclarer.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPullConsumerConstructorDeclarer.java
new file mode 100644
index 0000000000..9d185c9d15
--- /dev/null
+++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPullConsumerConstructorDeclarer.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.sermant.mq.grayscale.rocketmq.declarer;
+
+import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
+import io.sermant.core.plugin.agent.declarer.InterceptDeclarer;
+import io.sermant.core.plugin.agent.matcher.ClassMatcher;
+import io.sermant.core.plugin.agent.matcher.MethodMatcher;
+import io.sermant.mq.grayscale.rocketmq.interceptor.RocketMqConsumerConstructorInterceptor;
+
+/**
+ * pull consumer set gray consumer group declarer
+ *
+ * @author chengyouling
+ * @since 2024-09-07
+ **/
+public class RocketMqPullConsumerConstructorDeclarer extends AbstractPluginDeclarer {
+ private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.consumer.DefaultMQPullConsumer";
+
+ private static final String[] METHOD_PARAM_TYPES = {
+ "java.lang.String",
+ "java.lang.String",
+ "org.apache.rocketmq.remoting.RPCHook"
+ };
+
+ @Override
+ public ClassMatcher getClassMatcher() {
+ return ClassMatcher.nameEquals(ENHANCE_CLASS);
+ }
+
+ @Override
+ public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
+ return new InterceptDeclarer[]{
+ InterceptDeclarer.build(MethodMatcher.isConstructor()
+ .and(MethodMatcher.paramTypesEqual(METHOD_PARAM_TYPES)),
+ new RocketMqConsumerConstructorInterceptor())
+ };
+ }
+}
diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPullConsumerFetchDeclarer.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPullConsumerFetchDeclarer.java
new file mode 100644
index 0000000000..9471eafdc7
--- /dev/null
+++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPullConsumerFetchDeclarer.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.sermant.mq.grayscale.rocketmq.declarer;
+
+import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
+import io.sermant.core.plugin.agent.declarer.InterceptDeclarer;
+import io.sermant.core.plugin.agent.matcher.ClassMatcher;
+import io.sermant.core.plugin.agent.matcher.MethodMatcher;
+import io.sermant.mq.grayscale.rocketmq.interceptor.RocketMqPullConsumerFetchInterceptor;
+
+/**
+ * pull consumer build consumer client config declarer
+ *
+ * @author chengyouling
+ * @since 2024-05-27
+ **/
+public class RocketMqPullConsumerFetchDeclarer extends AbstractPluginDeclarer {
+ private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl";
+
+ private static final String METHOD_FETCH_SUBSCRIBE = "fetchSubscribeMessageQueues";
+
+ private static final String METHOD_FETCH_QUEUE = "fetchMessageQueuesInBalance";
+
+ @Override
+ public ClassMatcher getClassMatcher() {
+ return ClassMatcher.nameEquals(ENHANCE_CLASS);
+ }
+
+ @Override
+ public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
+ return new InterceptDeclarer[]{
+ InterceptDeclarer.build(MethodMatcher.nameContains(METHOD_FETCH_QUEUE, METHOD_FETCH_SUBSCRIBE),
+ new RocketMqPullConsumerFetchInterceptor())
+ };
+ }
+}
diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPullConsumerSubscriptionUpdateDeclarer.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPullConsumerSubscriptionUpdateDeclarer.java
new file mode 100644
index 0000000000..6c22710dfb
--- /dev/null
+++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPullConsumerSubscriptionUpdateDeclarer.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.sermant.mq.grayscale.rocketmq.declarer;
+
+import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
+import io.sermant.core.plugin.agent.declarer.InterceptDeclarer;
+import io.sermant.core.plugin.agent.matcher.ClassMatcher;
+import io.sermant.core.plugin.agent.matcher.MethodMatcher;
+import io.sermant.mq.grayscale.rocketmq.interceptor.RocketMqPullConsumerSubscriptionUpdateInterceptor;
+
+/**
+ * pull consumer TAG/SQL92 query message statement build declarer
+ *
+ * @author chengyouling
+ * @since 2024-05-27
+ **/
+public class RocketMqPullConsumerSubscriptionUpdateDeclarer extends AbstractPluginDeclarer {
+ private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl";
+
+ private static final String METHOD_NAME = "getSubscriptionData";
+
+ @Override
+ public ClassMatcher getClassMatcher() {
+ return ClassMatcher.nameEquals(ENHANCE_CLASS);
+ }
+
+ @Override
+ public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
+ return new InterceptDeclarer[]{
+ InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAME),
+ new RocketMqPullConsumerSubscriptionUpdateInterceptor())
+ };
+ }
+}
diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPushConsumerConstructorDeclarer.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPushConsumerConstructorDeclarer.java
new file mode 100644
index 0000000000..9a4e2ea30f
--- /dev/null
+++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPushConsumerConstructorDeclarer.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.sermant.mq.grayscale.rocketmq.declarer;
+
+import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
+import io.sermant.core.plugin.agent.declarer.InterceptDeclarer;
+import io.sermant.core.plugin.agent.matcher.ClassMatcher;
+import io.sermant.core.plugin.agent.matcher.MethodMatcher;
+import io.sermant.mq.grayscale.rocketmq.interceptor.RocketMqConsumerConstructorInterceptor;
+
+/**
+ * push consumer set gray consumer group declarer
+ *
+ * @author chengyouling
+ * @since 2024-09-07
+ **/
+public class RocketMqPushConsumerConstructorDeclarer extends AbstractPluginDeclarer {
+ private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.consumer.DefaultMQPushConsumer";
+
+ private static final String PARAMETER_STRING = "java.lang.String";
+
+ private static final String PARAMETER_HOOK = "org.apache.rocketmq.remoting.RPCHook";
+
+ private static final String PARAMETER_STRATEGY = "org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy";
+
+ private static final String[] METHOD_FOUR_PARAM_TYPES = {
+ PARAMETER_STRING,
+ PARAMETER_STRING,
+ PARAMETER_HOOK,
+ PARAMETER_STRATEGY
+ };
+
+ private static final String[] METHOD_SIX_PARAMS = {
+ PARAMETER_STRING,
+ PARAMETER_STRING,
+ PARAMETER_HOOK,
+ PARAMETER_STRATEGY,
+ "boolean",
+ PARAMETER_STRING
+ };
+
+ @Override
+ public ClassMatcher getClassMatcher() {
+ return ClassMatcher.nameEquals(ENHANCE_CLASS);
+ }
+
+ @Override
+ public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
+ return new InterceptDeclarer[]{
+ InterceptDeclarer.build(MethodMatcher.isConstructor()
+ .and(MethodMatcher.paramTypesEqual(METHOD_FOUR_PARAM_TYPES)
+ .or(MethodMatcher.paramTypesEqual(METHOD_SIX_PARAMS))),
+ new RocketMqConsumerConstructorInterceptor())
+ };
+ }
+}
diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPushConsumerSubscribeFetchDeclarer.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPushConsumerSubscribeFetchDeclarer.java
new file mode 100644
index 0000000000..1df882cf7b
--- /dev/null
+++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqPushConsumerSubscribeFetchDeclarer.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.sermant.mq.grayscale.rocketmq.declarer;
+
+import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
+import io.sermant.core.plugin.agent.declarer.InterceptDeclarer;
+import io.sermant.core.plugin.agent.matcher.ClassMatcher;
+import io.sermant.core.plugin.agent.matcher.MethodMatcher;
+import io.sermant.mq.grayscale.rocketmq.interceptor.RocketMqPushConsumerSubscribeFetchInterceptor;
+
+/**
+ * push consumer build consumer client config declarer
+ *
+ * @author chengyouling
+ * @since 2024-05-27
+ **/
+public class RocketMqPushConsumerSubscribeFetchDeclarer extends AbstractPluginDeclarer {
+ private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl";
+
+ private static final String METHOD_FETCH_SUBSCRIBE = "fetchSubscribeMessageQueues";
+
+ private static final String METHOD_SUBSCRIBE = "subscribe";
+
+ @Override
+ public ClassMatcher getClassMatcher() {
+ return ClassMatcher.nameEquals(ENHANCE_CLASS);
+ }
+
+ @Override
+ public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
+ return new InterceptDeclarer[]{
+ InterceptDeclarer.build(MethodMatcher.nameContains(METHOD_SUBSCRIBE, METHOD_FETCH_SUBSCRIBE),
+ new RocketMqPushConsumerSubscribeFetchInterceptor())
+ };
+ }
+}
diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqSchedulerRebuildSubscriptionDeclarer.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqSchedulerRebuildSubscriptionDeclarer.java
new file mode 100644
index 0000000000..0606c8e3dd
--- /dev/null
+++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/RocketMqSchedulerRebuildSubscriptionDeclarer.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.sermant.mq.grayscale.rocketmq.declarer;
+
+import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
+import io.sermant.core.plugin.agent.declarer.InterceptDeclarer;
+import io.sermant.core.plugin.agent.matcher.ClassMatcher;
+import io.sermant.core.plugin.agent.matcher.MethodMatcher;
+import io.sermant.mq.grayscale.rocketmq.interceptor.RocketMqSchedulerRebuildSubscriptionInterceptor;
+
+/**
+ * scheduler update SQL92 query message statement declarer
+ *
+ * @author chengyouling
+ * @since 2024-05-27
+ **/
+public class RocketMqSchedulerRebuildSubscriptionDeclarer extends AbstractPluginDeclarer {
+ private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.impl.consumer.RebalanceImpl";
+
+ private static final String METHOD_NAME = "getSubscriptionInner";
+
+ @Override
+ public ClassMatcher getClassMatcher() {
+ return ClassMatcher.nameEquals(ENHANCE_CLASS);
+ }
+
+ @Override
+ public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
+ return new InterceptDeclarer[]{
+ InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAME),
+ new RocketMqSchedulerRebuildSubscriptionInterceptor())
+ };
+ }
+}
diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqAbstractInterceptor.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqAbstractInterceptor.java
new file mode 100644
index 0000000000..70bd388e14
--- /dev/null
+++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqAbstractInterceptor.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.sermant.mq.grayscale.rocketmq.interceptor;
+
+import io.sermant.core.plugin.agent.entity.ExecuteContext;
+import io.sermant.core.plugin.agent.interceptor.AbstractInterceptor;
+import io.sermant.core.utils.StringUtils;
+import io.sermant.mq.grayscale.config.MqGrayConfigCache;
+import io.sermant.mq.grayscale.config.rocketmq.RocketMqConfigUtils;
+import io.sermant.mq.grayscale.rocketmq.service.RocketMqConsumerGroupAutoCheck;
+import io.sermant.mq.grayscale.rocketmq.utils.RocketMqGrayscaleConfigUtils;
+import io.sermant.mq.grayscale.rocketmq.utils.RocketMqSubscriptionDataUtils;
+
+/**
+ * mq consumer abstract interceptor
+ *
+ * @author chengyouling
+ * @since 2024-09-04
+ **/
+public abstract class RocketMqAbstractInterceptor extends AbstractInterceptor {
+ @Override
+ public ExecuteContext before(ExecuteContext context) throws Exception {
+ return context;
+ }
+
+ @Override
+ public ExecuteContext after(ExecuteContext context) throws Exception {
+ if (!MqGrayConfigCache.getCacheConfig().isEnabled()) {
+ return context;
+ }
+ return doAfter(context);
+ }
+
+ /**
+ * Handling after the intercept point
+ *
+ * @param context context
+ * @return context
+ * @throws Exception Exception
+ */
+ protected abstract ExecuteContext doAfter(ExecuteContext context) throws Exception;
+
+ /**
+ * build gray group name and set consumer client configs
+ *
+ * @param namesrvAddr namesrvAddr
+ * @param topic topic
+ * @param consumerGroup consumerGroup
+ */
+ protected void buildGroupAndClientConfig(String namesrvAddr, String topic, String consumerGroup) {
+ String grayGroupTag = RocketMqGrayscaleConfigUtils.getGrayGroupTag();
+ String subscribeScope = RocketMqSubscriptionDataUtils.buildSubscribeScope(topic, consumerGroup, namesrvAddr);
+ if (StringUtils.isEmpty(grayGroupTag)) {
+ RocketMqConsumerGroupAutoCheck.setConsumerClientConfig(namesrvAddr, topic, consumerGroup);
+ RocketMqConfigUtils.setBaseGroupTagChangeMap(subscribeScope, true);
+ return;
+ }
+ RocketMqConfigUtils.setGrayGroupTagChangeMap(subscribeScope, true);
+ }
+}
diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqConsumerConstructorInterceptor.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqConsumerConstructorInterceptor.java
new file mode 100644
index 0000000000..e9d30452a4
--- /dev/null
+++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqConsumerConstructorInterceptor.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.sermant.mq.grayscale.rocketmq.interceptor;
+
+import io.sermant.core.plugin.agent.entity.ExecuteContext;
+import io.sermant.core.plugin.agent.interceptor.AbstractInterceptor;
+import io.sermant.core.utils.StringUtils;
+import io.sermant.mq.grayscale.config.MqGrayConfigCache;
+import io.sermant.mq.grayscale.rocketmq.utils.RocketMqGrayscaleConfigUtils;
+
+/**
+ * DefaultMQPushConsumer/DefaultLitePullConsumer/DefaultMQPullConsumer Constructor method interceptor
+ * gray scene reset consumerGroup with grayGroupTag
+ *
+ * @author chengyouling
+ * @since 2024-05-27
+ **/
+public class RocketMqConsumerConstructorInterceptor extends AbstractInterceptor {
+ @Override
+ public ExecuteContext before(ExecuteContext context) throws Exception {
+ if (!MqGrayConfigCache.getCacheConfig().isEnabled()) {
+ return context;
+ }
+ String grayGroupTag = RocketMqGrayscaleConfigUtils.getGrayGroupTag();
+ if (StringUtils.isEmpty(grayGroupTag)) {
+ return context;
+ }
+ String originGroup = (String) context.getArguments()[1];
+ context.getArguments()[1]
+ = originGroup.contains("_" + grayGroupTag) ? originGroup : originGroup + "_" + grayGroupTag;
+ return context;
+ }
+
+ @Override
+ public ExecuteContext after(ExecuteContext context) throws Exception {
+ return context;
+ }
+}
diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqLitePullConsumerSubscribeInterceptor.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqLitePullConsumerSubscribeInterceptor.java
new file mode 100644
index 0000000000..987cbf93ef
--- /dev/null
+++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqLitePullConsumerSubscribeInterceptor.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.sermant.mq.grayscale.rocketmq.interceptor;
+
+import io.sermant.core.plugin.agent.entity.ExecuteContext;
+
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
+
+/**
+ * LitePullConsumer subscribe/fetchMessageQueues method interceptor
+ * base scene recording namesrvAddr、topic、group info
+ *
+ * @author chengyouling
+ * @since 2024-05-27
+ **/
+public class RocketMqLitePullConsumerSubscribeInterceptor extends RocketMqAbstractInterceptor {
+ @Override
+ public ExecuteContext doAfter(ExecuteContext context) throws Exception {
+ DefaultLitePullConsumerImpl litePullConsumerImpl = (DefaultLitePullConsumerImpl) context.getObject();
+ DefaultLitePullConsumer pullConsumer = litePullConsumerImpl.getDefaultLitePullConsumer();
+ buildGroupAndClientConfig(pullConsumer.getNamesrvAddr(), (String) context.getArguments()[0],
+ pullConsumer.getConsumerGroup());
+ return context;
+ }
+}
diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqProducerGrayMessageHookInterceptor.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqProducerGrayMessageHookInterceptor.java
new file mode 100644
index 0000000000..3da1a3df43
--- /dev/null
+++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqProducerGrayMessageHookInterceptor.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.sermant.mq.grayscale.rocketmq.interceptor;
+
+import io.sermant.core.plugin.agent.entity.ExecuteContext;
+import io.sermant.core.utils.ReflectUtils;
+import io.sermant.mq.grayscale.rocketmq.service.RocketMqGraySendMessageHook;
+
+import org.apache.rocketmq.client.hook.SendMessageHook;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+
+import java.lang.reflect.Method;
+import java.util.Optional;
+
+/**
+ * SendMessageHook builder interceptor
+ *
+ * @author chengyouling
+ * @since 2024-05-27
+ **/
+public class RocketMqProducerGrayMessageHookInterceptor extends RocketMqAbstractInterceptor {
+ @Override
+ public ExecuteContext doAfter(ExecuteContext context) throws Exception {
+ DefaultMQProducerImpl producer = (DefaultMQProducerImpl) context.getObject();
+ Optional method = ReflectUtils.findMethod(producer.getClass(), "registerSendMessageHook",
+ new Class[]{SendMessageHook.class});
+ if (method.isPresent()) {
+ method.get().invoke(producer, new RocketMqGraySendMessageHook());
+ }
+ return context;
+ }
+}
diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPullConsumerFetchInterceptor.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPullConsumerFetchInterceptor.java
new file mode 100644
index 0000000000..a8d0943480
--- /dev/null
+++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPullConsumerFetchInterceptor.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.sermant.mq.grayscale.rocketmq.interceptor;
+
+import io.sermant.core.plugin.agent.entity.ExecuteContext;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl;
+
+/**
+ * PullConsumer fetchSubscribeMessageQueues/fetchMessageQueuesInBalance method interceptor
+ * base scene recording namesrvAddr、topic、group info
+ *
+ * @author chengyouling
+ * @since 2024-05-27
+ **/
+public class RocketMqPullConsumerFetchInterceptor extends RocketMqAbstractInterceptor {
+ @Override
+ public ExecuteContext doAfter(ExecuteContext context) throws Exception {
+ DefaultMQPullConsumer pullConsumer =
+ ((DefaultMQPullConsumerImpl) context.getObject()).getDefaultMQPullConsumer();
+ buildGroupAndClientConfig(pullConsumer.getNamesrvAddr(), (String) context.getArguments()[0],
+ pullConsumer.getConsumerGroup());
+ return context;
+ }
+}
diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPullConsumerSubscriptionUpdateInterceptor.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPullConsumerSubscriptionUpdateInterceptor.java
new file mode 100644
index 0000000000..efad6f0ff4
--- /dev/null
+++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPullConsumerSubscriptionUpdateInterceptor.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.sermant.mq.grayscale.rocketmq.interceptor;
+
+import io.sermant.core.common.LoggerFactory;
+import io.sermant.core.plugin.agent.entity.ExecuteContext;
+import io.sermant.core.utils.ReflectUtils;
+import io.sermant.core.utils.StringUtils;
+import io.sermant.mq.grayscale.rocketmq.service.RocketMqConsumerGroupAutoCheck;
+import io.sermant.mq.grayscale.rocketmq.utils.RocketMqGrayscaleConfigUtils;
+import io.sermant.mq.grayscale.rocketmq.utils.RocketMqSubscriptionDataUtils;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+
+import java.util.Optional;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * update pull consumer subscription SQL92 query statement interceptor
+ *
+ * @author chengyouling
+ * @since 2024-07-27
+ **/
+public class RocketMqPullConsumerSubscriptionUpdateInterceptor extends RocketMqAbstractInterceptor {
+ private static final Logger LOGGER = LoggerFactory.getLogger();
+
+ @Override
+ public ExecuteContext doAfter(ExecuteContext context) throws Exception {
+ SubscriptionData subscriptionData = (SubscriptionData) context.getResult();
+ if (RocketMqSubscriptionDataUtils.isExpressionTypeInaccurate(subscriptionData.getExpressionType())) {
+ return context;
+ }
+ Optional