From 0d86f2a66627b81af9dd1a5fb2e8b9675909346a Mon Sep 17 00:00:00 2001 From: chengyouling Date: Fri, 20 Dec 2024 17:49:40 +0800 Subject: [PATCH] rocketmq message grayscale support 5.1.x Signed-off-by: chengyouling --- ...51LitePullConsumerConstructorDeclarer.java | 52 ++++++ ...tMqV51PullConsumerConstructorDeclarer.java | 52 ++++++ ...tMqV51PushConsumerConstructorDeclarer.java | 61 ++++++ ...ConsumerSubscriptionUpdateInterceptor.java | 16 +- ...hedulerRebuildSubscriptionInterceptor.java | 15 +- ...etMqV51ConsumerConstructorInterceptor.java | 52 ++++++ .../RocketMqConsumerGroupAutoCheck.java | 22 +-- .../rocketmq/utils/RocketMqReflectUtils.java | 174 ++++++++++++++++++ .../utils/RocketMqSubscriptionDataUtils.java | 24 +-- ....core.plugin.agent.declarer.PluginDeclarer | 5 + .../RocketMqConsumerGroupAutoCheckTest.java | 14 -- 11 files changed, 436 insertions(+), 51 deletions(-) create mode 100644 sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/client51/RocketMqV51LitePullConsumerConstructorDeclarer.java create mode 100644 sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/client51/RocketMqV51PullConsumerConstructorDeclarer.java create mode 100644 sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/client51/RocketMqV51PushConsumerConstructorDeclarer.java create mode 100644 sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/client51/RocketMqV51ConsumerConstructorInterceptor.java create mode 100644 sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/utils/RocketMqReflectUtils.java diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/client51/RocketMqV51LitePullConsumerConstructorDeclarer.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/client51/RocketMqV51LitePullConsumerConstructorDeclarer.java new file mode 100644 index 0000000000..4cde385afd --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/client51/RocketMqV51LitePullConsumerConstructorDeclarer.java @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.declarer.client51; + +import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.mq.grayscale.rocketmq.interceptor.client51.RocketMqV51ConsumerConstructorInterceptor; + +/** + * 5.1.x client lite pull consumer set gray consumer group declarer + * + * @author chengyouling + * @since 2024-09-07 + **/ +public class RocketMqV51LitePullConsumerConstructorDeclarer extends AbstractPluginDeclarer { + private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.consumer.DefaultLitePullConsumer"; + + private static final String[] METHOD_PARAM_TYPES = { + "java.lang.String", + "org.apache.rocketmq.remoting.RPCHook" + }; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameEquals(ENHANCE_CLASS); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.isConstructor() + .and(MethodMatcher.paramTypesEqual(METHOD_PARAM_TYPES)), + new RocketMqV51ConsumerConstructorInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/client51/RocketMqV51PullConsumerConstructorDeclarer.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/client51/RocketMqV51PullConsumerConstructorDeclarer.java new file mode 100644 index 0000000000..7a152709b4 --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/client51/RocketMqV51PullConsumerConstructorDeclarer.java @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.declarer.client51; + +import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.mq.grayscale.rocketmq.interceptor.client51.RocketMqV51ConsumerConstructorInterceptor; + +/** + * 5.1.x client pull consumer set gray consumer group declarer + * + * @author chengyouling + * @since 2024-09-07 + **/ +public class RocketMqV51PullConsumerConstructorDeclarer extends AbstractPluginDeclarer { + private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.consumer.DefaultMQPullConsumer"; + + private static final String[] METHOD_PARAM_TYPES = { + "java.lang.String", + "org.apache.rocketmq.remoting.RPCHook" + }; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameEquals(ENHANCE_CLASS); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.isConstructor() + .and(MethodMatcher.paramTypesEqual(METHOD_PARAM_TYPES)), + new RocketMqV51ConsumerConstructorInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/client51/RocketMqV51PushConsumerConstructorDeclarer.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/client51/RocketMqV51PushConsumerConstructorDeclarer.java new file mode 100644 index 0000000000..29377fa08c --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/declarer/client51/RocketMqV51PushConsumerConstructorDeclarer.java @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.declarer.client51; + +import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.mq.grayscale.rocketmq.interceptor.client51.RocketMqV51ConsumerConstructorInterceptor; + +/** + * 5.1.x client push consumer set gray consumer group declarer + * + * @author chengyouling + * @since 2024-09-07 + **/ +public class RocketMqV51PushConsumerConstructorDeclarer extends AbstractPluginDeclarer { + private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.consumer.DefaultMQPushConsumer"; + + private static final String PARAMETER_STRING = "java.lang.String"; + + private static final String PARAMETER_HOOK = "org.apache.rocketmq.remoting.RPCHook"; + + private static final String PARAMETER_STRATEGY = "org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy"; + + private static final String[] METHOD_PARAM_TYPES = { + PARAMETER_STRING, + PARAMETER_HOOK, + PARAMETER_STRATEGY, + "boolean", + PARAMETER_STRING + }; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameEquals(ENHANCE_CLASS); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.isConstructor() + .and(MethodMatcher.paramTypesEqual(METHOD_PARAM_TYPES)), + new RocketMqV51ConsumerConstructorInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPullConsumerSubscriptionUpdateInterceptor.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPullConsumerSubscriptionUpdateInterceptor.java index efad6f0ff4..143408abd7 100644 --- a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPullConsumerSubscriptionUpdateInterceptor.java +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqPullConsumerSubscriptionUpdateInterceptor.java @@ -22,12 +22,12 @@ import io.sermant.core.utils.StringUtils; import io.sermant.mq.grayscale.rocketmq.service.RocketMqConsumerGroupAutoCheck; import io.sermant.mq.grayscale.rocketmq.utils.RocketMqGrayscaleConfigUtils; +import io.sermant.mq.grayscale.rocketmq.utils.RocketMqReflectUtils; import io.sermant.mq.grayscale.rocketmq.utils.RocketMqSubscriptionDataUtils; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl; import org.apache.rocketmq.client.impl.factory.MQClientInstance; -import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import java.util.Optional; import java.util.logging.Level; @@ -44,8 +44,9 @@ public class RocketMqPullConsumerSubscriptionUpdateInterceptor extends RocketMqA @Override public ExecuteContext doAfter(ExecuteContext context) throws Exception { - SubscriptionData subscriptionData = (SubscriptionData) context.getResult(); - if (RocketMqSubscriptionDataUtils.isExpressionTypeInaccurate(subscriptionData.getExpressionType())) { + Object subscriptionData = context.getResult(); + if (RocketMqSubscriptionDataUtils + .isExpressionTypeInaccurate(RocketMqReflectUtils.getExpressionType(subscriptionData))) { return context; } Optional fieldValue = ReflectUtils.getFieldValue(context.getObject(), "mQClientFactory"); @@ -58,19 +59,20 @@ public ExecuteContext doAfter(ExecuteContext context) throws Exception { return context; } - private void buildSql92SubscriptionData(ExecuteContext context, SubscriptionData subscriptionData, + private void buildSql92SubscriptionData(ExecuteContext context, Object subscriptionData, MQClientInstance instance) { DefaultMQPullConsumer pullConsumer = ((DefaultMQPullConsumerImpl) context.getObject()).getDefaultMQPullConsumer(); String consumerGroup = pullConsumer.getConsumerGroup(); if (StringUtils.isEmpty(RocketMqGrayscaleConfigUtils.getGrayGroupTag())) { - RocketMqConsumerGroupAutoCheck.setMqClientInstance(subscriptionData.getTopic(), consumerGroup, instance); + RocketMqConsumerGroupAutoCheck.setMqClientInstance(RocketMqReflectUtils.getTopic(subscriptionData), + consumerGroup, instance); RocketMqConsumerGroupAutoCheck.syncUpdateCacheGrayTags(); RocketMqConsumerGroupAutoCheck.startSchedulerCheckGroupTask(); } String namesrvAddr = instance.getClientConfig().getNamesrvAddr(); - String subscribeScope = RocketMqSubscriptionDataUtils.buildSubscribeScope(subscriptionData.getTopic(), - consumerGroup, namesrvAddr); + String subscribeScope = RocketMqSubscriptionDataUtils + .buildSubscribeScope(RocketMqReflectUtils.getTopic(subscriptionData), consumerGroup, namesrvAddr); RocketMqSubscriptionDataUtils.resetsSql92SubscriptionData(subscriptionData, subscribeScope); } } diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqSchedulerRebuildSubscriptionInterceptor.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqSchedulerRebuildSubscriptionInterceptor.java index 718f9f29d7..d93a0924f7 100644 --- a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqSchedulerRebuildSubscriptionInterceptor.java +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/RocketMqSchedulerRebuildSubscriptionInterceptor.java @@ -20,11 +20,11 @@ import io.sermant.core.utils.StringUtils; import io.sermant.mq.grayscale.rocketmq.service.RocketMqConsumerGroupAutoCheck; import io.sermant.mq.grayscale.rocketmq.utils.RocketMqGrayscaleConfigUtils; +import io.sermant.mq.grayscale.rocketmq.utils.RocketMqReflectUtils; import io.sermant.mq.grayscale.rocketmq.utils.RocketMqSubscriptionDataUtils; import org.apache.rocketmq.client.impl.consumer.RebalanceImpl; import org.apache.rocketmq.client.impl.factory.MQClientInstance; -import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import java.util.concurrent.ConcurrentMap; @@ -39,13 +39,14 @@ public class RocketMqSchedulerRebuildSubscriptionInterceptor extends RocketMqAbs @Override public ExecuteContext doAfter(ExecuteContext context) throws Exception { - ConcurrentMap map = (ConcurrentMap) context.getResult(); + ConcurrentMap map = (ConcurrentMap) context.getResult(); RebalanceImpl balance = (RebalanceImpl) context.getObject(); if (balance.getConsumerGroup() == null) { return context; } - for (SubscriptionData subscriptionData : map.values()) { - if (RocketMqSubscriptionDataUtils.isExpressionTypeInaccurate(subscriptionData.getExpressionType())) { + for (Object subscriptionData : map.values()) { + if (RocketMqSubscriptionDataUtils + .isExpressionTypeInaccurate(RocketMqReflectUtils.getExpressionType(subscriptionData))) { continue; } buildSql92SubscriptionData(subscriptionData, balance); @@ -53,9 +54,9 @@ public ExecuteContext doAfter(ExecuteContext context) throws Exception { return context; } - private void buildSql92SubscriptionData(SubscriptionData subscriptionData, RebalanceImpl balance) { + private void buildSql92SubscriptionData(Object subscriptionData, RebalanceImpl balance) { synchronized (lock) { - String topic = subscriptionData.getTopic(); + String topic = RocketMqReflectUtils.getTopic(subscriptionData); if (!RocketMqSubscriptionDataUtils.getGrayTagChangeFlag(topic, balance)) { return; } @@ -74,7 +75,7 @@ private void buildSql92SubscriptionData(SubscriptionData subscriptionData, Rebal } } - private void resetsSql92SubscriptionData(String topic, String consumerGroup, SubscriptionData subscriptionData, + private void resetsSql92SubscriptionData(String topic, String consumerGroup, Object subscriptionData, String namesrvAddr) { String subscribeScope = RocketMqSubscriptionDataUtils.buildSubscribeScope(topic, consumerGroup, namesrvAddr); diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/client51/RocketMqV51ConsumerConstructorInterceptor.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/client51/RocketMqV51ConsumerConstructorInterceptor.java new file mode 100644 index 0000000000..9f244fea0d --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/interceptor/client51/RocketMqV51ConsumerConstructorInterceptor.java @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.interceptor.client51; + +import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.plugin.agent.interceptor.AbstractInterceptor; +import io.sermant.core.utils.StringUtils; +import io.sermant.mq.grayscale.config.MqGrayConfigCache; +import io.sermant.mq.grayscale.rocketmq.utils.RocketMqGrayscaleConfigUtils; + +/** + * upper 5.1.x DefaultMQPushConsumer/DefaultLitePullConsumer/DefaultMQPullConsumer Constructor method interceptor + * gray scene reset consumerGroup with grayGroupTag + * + * @author chengyouling + * @since 2024-05-27 + **/ +public class RocketMqV51ConsumerConstructorInterceptor extends AbstractInterceptor { + @Override + public ExecuteContext before(ExecuteContext context) throws Exception { + if (!MqGrayConfigCache.getCacheConfig().isEnabled()) { + return context; + } + String grayGroupTag = RocketMqGrayscaleConfigUtils.getGrayGroupTag(); + if (StringUtils.isEmpty(grayGroupTag)) { + return context; + } + String originGroup = (String) context.getArguments()[0]; + context.getArguments()[0] + = originGroup.contains("_" + grayGroupTag) ? originGroup : originGroup + "_" + grayGroupTag; + return context; + } + + @Override + public ExecuteContext after(ExecuteContext context) throws Exception { + return context; + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/service/RocketMqConsumerGroupAutoCheck.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/service/RocketMqConsumerGroupAutoCheck.java index 11f133406c..2bca36832a 100644 --- a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/service/RocketMqConsumerGroupAutoCheck.java +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/service/RocketMqConsumerGroupAutoCheck.java @@ -22,6 +22,7 @@ import io.sermant.mq.grayscale.config.MqGrayConfigCache; import io.sermant.mq.grayscale.rocketmq.config.RocketMqConsumerClientConfig; import io.sermant.mq.grayscale.rocketmq.utils.RocketMqGrayscaleConfigUtils; +import io.sermant.mq.grayscale.rocketmq.utils.RocketMqReflectUtils; import io.sermant.mq.grayscale.rocketmq.utils.RocketMqSubscriptionDataUtils; import org.apache.commons.lang3.StringUtils; @@ -29,9 +30,6 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.MQClientAPIImpl; import org.apache.rocketmq.client.impl.factory.MQClientInstance; -import org.apache.rocketmq.common.protocol.body.GroupList; -import org.apache.rocketmq.common.protocol.route.BrokerData; -import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; @@ -158,22 +156,22 @@ private static Set findGrayConsumerGroupAndGetTags(RocketMqConsumerClien try { MQClientAPIImpl mqClientApi = clientConfig.getMqClientInstance().getMQClientAPIImpl(); String brokerAddress = getBrokerAddress(clientConfig.getTopic(), mqClientApi); - GroupList groupList = mqClientApi.queryTopicConsumeByWho(brokerAddress, clientConfig.getTopic(), - ROCKET_MQ_READ_TIMEOUT); + Object groupList = RocketMqReflectUtils.queryTopicConsumeByWho(mqClientApi, brokerAddress, + clientConfig.getTopic(), ROCKET_MQ_READ_TIMEOUT); return getGrayTagsByConsumerGroup(groupList, brokerAddress, mqClientApi, clientConfig.getConsumerGroup()); } catch (MQClientException | InterruptedException | RemotingTimeoutException | RemotingSendRequestException - | RemotingConnectException | MQBrokerException e) { + | RemotingConnectException e) { LOGGER.log(Level.FINE, String.format(Locale.ENGLISH, "[auto-check] error, message: %s", e.getMessage()), e); } return new HashSet<>(); } - private static Set getGrayTagsByConsumerGroup(GroupList groupList, String brokerAddress, + private static Set getGrayTagsByConsumerGroup(Object groupList, String brokerAddress, MQClientAPIImpl mqClientApi, String consumerGroup) { Set grayTags = new HashSet<>(); - for (String group : groupList.getGroupList()) { + for (String group : RocketMqReflectUtils.getGroupList(groupList)) { try { List consumerIds = mqClientApi.getConsumerIdListByGroup(brokerAddress, group, ROCKET_MQ_READ_TIMEOUT); @@ -196,11 +194,11 @@ private static Set getGrayTagsByConsumerGroup(GroupList groupList, Strin private static String getBrokerAddress(String topic, MQClientAPIImpl mqClientApi) throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, InterruptedException, MQClientException { - TopicRouteData topicRouteData = mqClientApi.getTopicRouteInfoFromNameServer(topic, ROCKET_MQ_READ_TIMEOUT, - false); + Object topicRouteData = RocketMqReflectUtils.getTopicRouteInfoFromNameServer(mqClientApi, topic, + ROCKET_MQ_READ_TIMEOUT, false); List brokerList = new ArrayList<>(); - for (BrokerData brokerData : topicRouteData.getBrokerDatas()) { - brokerList.addAll(brokerData.getBrokerAddrs().values()); + for (Object brokerData : RocketMqReflectUtils.getBrokerDatas(topicRouteData)) { + brokerList.addAll(RocketMqReflectUtils.getBrokerAddrs(brokerData).values()); } // cluster mode has multiple addresses, just select one diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/utils/RocketMqReflectUtils.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/utils/RocketMqReflectUtils.java new file mode 100644 index 0000000000..b6b30a2d49 --- /dev/null +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/utils/RocketMqReflectUtils.java @@ -0,0 +1,174 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.mq.grayscale.rocketmq.utils; + +import io.sermant.core.utils.ReflectUtils; + +import org.apache.rocketmq.client.impl.MQClientAPIImpl; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * 5.0.x/5.1.x path difference class reflect utils + * + * @author chengyouling + * @since 2024-12-19 + */ +public class RocketMqReflectUtils { + private RocketMqReflectUtils() { + } + + /** + * get groups + * + * @param groupList GroupList + * @return groupList + */ + public static Set getGroupList(Object groupList) { + Optional groupSetsOpt = ReflectUtils.invokeMethodWithNoneParameter(groupList, "getGroupList"); + return groupSetsOpt.map(groupSets -> (Set) groupSets).orElseGet(HashSet::new); + } + + /** + * get brokerDatas + * + * @param topicRouteData topicRouteData + * @return brokerDatas + */ + public static List getBrokerDatas(Object topicRouteData) { + Optional brokerDatasOpt = ReflectUtils.invokeMethodWithNoneParameter(topicRouteData, "getBrokerDatas"); + return brokerDatasOpt.map(brokerDatas -> (List) brokerDatas).orElseGet(ArrayList::new); + } + + /** + * get brokerAddrs + * + * @param brokerData brokerData + * @return brokerAddrs + */ + public static Map getBrokerAddrs(Object brokerData) { + Optional brokerAddrsOpt = ReflectUtils.invokeMethodWithNoneParameter(brokerData, "getBrokerAddrs"); + return brokerAddrsOpt.map(brokerAddrs -> (Map) brokerAddrs).orElseGet(HashMap::new); + } + + /** + * get expressionType + * + * @param subscriptionData subscriptionData + * @return expressionType + */ + public static String getExpressionType(Object subscriptionData) { + Optional expressionTypeOpt = ReflectUtils.invokeMethodWithNoneParameter(subscriptionData, + "getExpressionType"); + return expressionTypeOpt.map(expressionType -> (String) expressionType).orElse(""); + } + + /** + * set SubscriptionDatae info + * + * @param subscriptionData subscriptionData + * @param methodName methodName + * @param paramsType paramsType + * @param params params + */ + public static void setSubscriptionDatae(Object subscriptionData, String methodName, Class[] paramsType, + Object[] params) { + ReflectUtils.invokeMethod(subscriptionData, methodName, paramsType, params); + } + + /** + * get topic + * + * @param subscriptionData subscriptionData + * @return topic + */ + public static String getTopic(Object subscriptionData) { + Optional topicOpt = ReflectUtils.invokeMethodWithNoneParameter(subscriptionData, "getTopic"); + return topicOpt.map(topic -> (String) topic).orElse(""); + } + + /** + * get subString + * + * @param subscriptionData subscriptionData + * @return subString + */ + public static String getSubString(Object subscriptionData) { + Optional substrOpt = ReflectUtils.invokeMethodWithNoneParameter(subscriptionData, "getSubString"); + return substrOpt.map(substr -> (String) substr).orElse(""); + } + + /** + * get tags set + * + * @param subscriptionData subscriptionData + * @return tagsSet + */ + public static Set getTagsSet(Object subscriptionData) { + Optional tagsSetOpt = ReflectUtils.invokeMethodWithNoneParameter(subscriptionData, "getTagsSet"); + return tagsSetOpt.map(tagsSet -> (Set) tagsSet).orElse(new HashSet<>()); + } + + /** + * get code set + * + * @param subscriptionData subscriptionData + * @return codeSet + */ + public static Set getCodeSet(Object subscriptionData) { + Optional codeSetOpt = ReflectUtils.invokeMethodWithNoneParameter(subscriptionData, "getCodeSet"); + return codeSetOpt.map(codeSet -> (Set) codeSet).orElse(new HashSet<>()); + } + + /** + * get topicRouteData + * + * @param mqClientApi mqClientApi + * @param topic topic + * @param timeout timeout + * @param allowTopicNotExist allowTopicNotExist + * @return topicRouteData + */ + public static Object getTopicRouteInfoFromNameServer(MQClientAPIImpl mqClientApi, String topic, long timeout, + boolean allowTopicNotExist) { + Optional topicRouteData = ReflectUtils.invokeMethod(mqClientApi, "getTopicRouteInfoFromNameServer", + new Class[]{String.class, long.class, boolean.class}, new Object[]{topic, timeout, allowTopicNotExist}); + return topicRouteData.orElseGet(Object::new); + } + + /** + * get groupList + * + * @param mqClientApi mqClientApi + * @param brokerAddress brokerAddress + * @param topic topic + * @param timeout timeout + * @return groupList + */ + public static Object queryTopicConsumeByWho(MQClientAPIImpl mqClientApi, String brokerAddress, String topic, + long timeout) { + Optional groupList = ReflectUtils.invokeMethod(mqClientApi, "queryTopicConsumeByWho", + new Class[]{String.class, String.class, long.class}, new Object[]{brokerAddress, topic, timeout}); + return groupList.orElseGet(Object::new); + } +} diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/utils/RocketMqSubscriptionDataUtils.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/utils/RocketMqSubscriptionDataUtils.java index eddc3c3fbb..cfde548684 100644 --- a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/utils/RocketMqSubscriptionDataUtils.java +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/java/io/sermant/mq/grayscale/rocketmq/utils/RocketMqSubscriptionDataUtils.java @@ -26,7 +26,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.impl.consumer.RebalanceImpl; -import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import java.util.ArrayList; import java.util.HashMap; @@ -274,24 +273,27 @@ private static boolean containsGrayTags(String condition) { * @param subscriptionData subscriptionData * @param subscribeScope subscribeScope */ - public static void resetsSql92SubscriptionData(SubscriptionData subscriptionData, String subscribeScope) { + public static void resetsSql92SubscriptionData(Object subscriptionData, String subscribeScope) { String originSubData; - if (EXPRESSION_TYPE_TAG.equals(subscriptionData.getExpressionType())) { - originSubData = buildSql92ExpressionByTags(subscriptionData.getTagsSet()); + if (EXPRESSION_TYPE_TAG.equals(RocketMqReflectUtils.getExpressionType(subscriptionData))) { + originSubData = buildSql92ExpressionByTags(RocketMqReflectUtils.getTagsSet(subscriptionData)); } else { - originSubData = subscriptionData.getSubString(); + originSubData = RocketMqReflectUtils.getSubString(subscriptionData); } String newSubStr = addGrayTagsToSql92Expression(originSubData, subscribeScope); if (StringUtils.isEmpty(newSubStr)) { newSubStr = SELECT_ALL_MESSAGE_SQL; } - if (EXPRESSION_TYPE_TAG.equals(subscriptionData.getExpressionType())) { - subscriptionData.setExpressionType("SQL92"); - subscriptionData.getTagsSet().clear(); - subscriptionData.getCodeSet().clear(); + if (EXPRESSION_TYPE_TAG.equals(RocketMqReflectUtils.getExpressionType(subscriptionData))) { + RocketMqReflectUtils.setSubscriptionDatae(subscriptionData, "setExpressionType", + new Class[]{String.class}, new Object[]{"SQL92"}); + RocketMqReflectUtils.getTagsSet(subscriptionData).clear(); + RocketMqReflectUtils.getCodeSet(subscriptionData).clear(); } - subscriptionData.setSubString(newSubStr); - subscriptionData.setSubVersion(System.currentTimeMillis()); + RocketMqReflectUtils.setSubscriptionDatae(subscriptionData, "setSubString", + new Class[]{String.class}, new Object[]{newSubStr}); + RocketMqReflectUtils.setSubscriptionDatae(subscriptionData, "setSubVersion", + new Class[]{Long.class}, new Object[]{System.currentTimeMillis()}); LOGGER.warning(String.format(Locale.ENGLISH, "update [key: %s] SQL92 subscriptionData, originSubStr: " + "[%s], newSubStr: [%s]", subscribeScope, originSubData, newSubStr)); } diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.agent.declarer.PluginDeclarer b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.agent.declarer.PluginDeclarer index b1cf6af8f9..2e8db9b9e5 100644 --- a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.agent.declarer.PluginDeclarer +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.agent.declarer.PluginDeclarer @@ -23,3 +23,8 @@ io.sermant.mq.grayscale.rocketmq.declarer.RocketMqPushConsumerSubscribeFetchDecl io.sermant.mq.grayscale.rocketmq.declarer.RocketMqLitePullConsumerConstructorDeclarer io.sermant.mq.grayscale.rocketmq.declarer.RocketMqPullConsumerConstructorDeclarer io.sermant.mq.grayscale.rocketmq.declarer.RocketMqPushConsumerConstructorDeclarer + +# UPPER 5.1.x client +io.sermant.mq.grayscale.rocketmq.declarer.client51.RocketMqV51LitePullConsumerConstructorDeclarer +io.sermant.mq.grayscale.rocketmq.declarer.client51.RocketMqV51PullConsumerConstructorDeclarer +io.sermant.mq.grayscale.rocketmq.declarer.client51.RocketMqV51PushConsumerConstructorDeclarer diff --git a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/test/java/io/sermant/mq/grayscale/rocketmq/service/RocketMqConsumerGroupAutoCheckTest.java b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/test/java/io/sermant/mq/grayscale/rocketmq/service/RocketMqConsumerGroupAutoCheckTest.java index e081339c43..846d375a8a 100644 --- a/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/test/java/io/sermant/mq/grayscale/rocketmq/service/RocketMqConsumerGroupAutoCheckTest.java +++ b/sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin/src/test/java/io/sermant/mq/grayscale/rocketmq/service/RocketMqConsumerGroupAutoCheckTest.java @@ -61,24 +61,10 @@ public void testExpressionByAutoFindGrayGroup() throws Exception { MQClientAPIImpl mqClientAPI = Mockito.mock(MQClientAPIImpl.class); Mockito.when(instance.getMQClientAPIImpl()).thenReturn(mqClientAPI); - TopicRouteData topicRouteData = Mockito.mock(TopicRouteData.class); - Mockito.when(mqClientAPI.getTopicRouteInfoFromNameServer("TEST-TOPIC", 5000L, false)) - .thenReturn(topicRouteData); - - HashMap brokerAddrs = new HashMap<>(); - brokerAddrs.put(123L, "127.0.0.1:9876"); - BrokerData brokerData = new BrokerData(); - brokerData.setBrokerAddrs(brokerAddrs); - List brokerDataList = new ArrayList<>(); - brokerDataList.add(brokerData); - Mockito.when(topicRouteData.getBrokerDatas()).thenReturn(brokerDataList); - GroupList groupList = new GroupList(); HashSet list = new HashSet<>(); list.add("group_junit_gray"); groupList.setGroupList(list); - Mockito.when(mqClientAPI.queryTopicConsumeByWho("127.0.0.1:9876", "TEST-TOPIC", 5000L)) - .thenReturn(groupList); List consumerIds = new ArrayList<>(); consumerIds.add("123");