Skip to content

Commit

Permalink
rocketmq message grayscale support 5.1.x
Browse files Browse the repository at this point in the history
Signed-off-by: chengyouling <[email protected]>
  • Loading branch information
chengyouling committed Jan 2, 2025
1 parent d8a2864 commit 6aae697
Show file tree
Hide file tree
Showing 11 changed files with 472 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.mq.grayscale.rocketmq.declarer.client51;

import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
import io.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import io.sermant.core.plugin.agent.matcher.ClassMatcher;
import io.sermant.core.plugin.agent.matcher.MethodMatcher;
import io.sermant.mq.grayscale.rocketmq.interceptor.client51.RocketMqV51ConsumerConstructorInterceptor;

/**
* 5.1.x client lite pull consumer set gray consumer group declarer
*
* @author chengyouling
* @since 2024-09-07
**/
public class RocketMqV51LitePullConsumerConstructorDeclarer extends AbstractPluginDeclarer {
private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.consumer.DefaultLitePullConsumer";

private static final String[] METHOD_PARAM_TYPES = {
"java.lang.String",
"org.apache.rocketmq.remoting.RPCHook"
};

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.nameEquals(ENHANCE_CLASS);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{
InterceptDeclarer.build(MethodMatcher.isConstructor()
.and(MethodMatcher.paramTypesEqual(METHOD_PARAM_TYPES)),
new RocketMqV51ConsumerConstructorInterceptor())
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.mq.grayscale.rocketmq.declarer.client51;

import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
import io.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import io.sermant.core.plugin.agent.matcher.ClassMatcher;
import io.sermant.core.plugin.agent.matcher.MethodMatcher;
import io.sermant.mq.grayscale.rocketmq.interceptor.client51.RocketMqV51ConsumerConstructorInterceptor;

/**
* 5.1.x client pull consumer set gray consumer group declarer
*
* @author chengyouling
* @since 2024-09-07
**/
public class RocketMqV51PullConsumerConstructorDeclarer extends AbstractPluginDeclarer {
private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.consumer.DefaultMQPullConsumer";

private static final String[] METHOD_PARAM_TYPES = {
"java.lang.String",
"org.apache.rocketmq.remoting.RPCHook"
};

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.nameEquals(ENHANCE_CLASS);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{
InterceptDeclarer.build(MethodMatcher.isConstructor()
.and(MethodMatcher.paramTypesEqual(METHOD_PARAM_TYPES)),
new RocketMqV51ConsumerConstructorInterceptor())
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.mq.grayscale.rocketmq.declarer.client51;

import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
import io.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import io.sermant.core.plugin.agent.matcher.ClassMatcher;
import io.sermant.core.plugin.agent.matcher.MethodMatcher;
import io.sermant.mq.grayscale.rocketmq.interceptor.client51.RocketMqV51ConsumerConstructorInterceptor;

/**
* 5.1.x client push consumer set gray consumer group declarer
*
* @author chengyouling
* @since 2024-09-07
**/
public class RocketMqV51PushConsumerConstructorDeclarer extends AbstractPluginDeclarer {
private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.consumer.DefaultMQPushConsumer";

private static final String PARAMETER_STRING = "java.lang.String";

private static final String PARAMETER_HOOK = "org.apache.rocketmq.remoting.RPCHook";

private static final String PARAMETER_STRATEGY = "org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy";

private static final String[] METHOD_PARAM_TYPES = {
PARAMETER_STRING,
PARAMETER_HOOK,
PARAMETER_STRATEGY,
"boolean",
PARAMETER_STRING
};

private static final String[] METHOD_PARAM_THREE_TYPES = {
PARAMETER_STRING,
PARAMETER_HOOK,
PARAMETER_STRATEGY
};

private static final String[] METHOD_PARAM_TWO_TYPES = {
PARAMETER_STRING,
PARAMETER_HOOK
};

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.nameEquals(ENHANCE_CLASS);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{
InterceptDeclarer.build(MethodMatcher.isConstructor()
.and(MethodMatcher.paramTypesEqual(METHOD_PARAM_TYPES)
.or(MethodMatcher.paramTypesEqual(METHOD_PARAM_THREE_TYPES))
.or(MethodMatcher.paramTypesEqual(METHOD_PARAM_TWO_TYPES))),
new RocketMqV51ConsumerConstructorInterceptor())
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import io.sermant.core.utils.StringUtils;
import io.sermant.mq.grayscale.rocketmq.service.RocketMqConsumerGroupAutoCheck;
import io.sermant.mq.grayscale.rocketmq.utils.RocketMqGrayscaleConfigUtils;
import io.sermant.mq.grayscale.rocketmq.utils.RocketMqReflectUtils;
import io.sermant.mq.grayscale.rocketmq.utils.RocketMqSubscriptionDataUtils;

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;

import java.util.Optional;
import java.util.logging.Level;
Expand All @@ -44,8 +44,9 @@ public class RocketMqPullConsumerSubscriptionUpdateInterceptor extends RocketMqA

@Override
public ExecuteContext doAfter(ExecuteContext context) throws Exception {
SubscriptionData subscriptionData = (SubscriptionData) context.getResult();
if (RocketMqSubscriptionDataUtils.isExpressionTypeInaccurate(subscriptionData.getExpressionType())) {
Object subscriptionData = context.getResult();
if (RocketMqSubscriptionDataUtils
.isExpressionTypeInaccurate(RocketMqReflectUtils.getExpressionType(subscriptionData))) {
return context;
}
Optional<Object> fieldValue = ReflectUtils.getFieldValue(context.getObject(), "mQClientFactory");
Expand All @@ -58,19 +59,20 @@ public ExecuteContext doAfter(ExecuteContext context) throws Exception {
return context;
}

private void buildSql92SubscriptionData(ExecuteContext context, SubscriptionData subscriptionData,
private void buildSql92SubscriptionData(ExecuteContext context, Object subscriptionData,
MQClientInstance instance) {
DefaultMQPullConsumer pullConsumer
= ((DefaultMQPullConsumerImpl) context.getObject()).getDefaultMQPullConsumer();
String consumerGroup = pullConsumer.getConsumerGroup();
if (StringUtils.isEmpty(RocketMqGrayscaleConfigUtils.getGrayGroupTag())) {
RocketMqConsumerGroupAutoCheck.setMqClientInstance(subscriptionData.getTopic(), consumerGroup, instance);
RocketMqConsumerGroupAutoCheck.setMqClientInstance(RocketMqReflectUtils.getTopic(subscriptionData),
consumerGroup, instance);
RocketMqConsumerGroupAutoCheck.syncUpdateCacheGrayTags();
RocketMqConsumerGroupAutoCheck.startSchedulerCheckGroupTask();
}
String namesrvAddr = instance.getClientConfig().getNamesrvAddr();
String subscribeScope = RocketMqSubscriptionDataUtils.buildSubscribeScope(subscriptionData.getTopic(),
consumerGroup, namesrvAddr);
String subscribeScope = RocketMqSubscriptionDataUtils
.buildSubscribeScope(RocketMqReflectUtils.getTopic(subscriptionData), consumerGroup, namesrvAddr);
RocketMqSubscriptionDataUtils.resetsSql92SubscriptionData(subscriptionData, subscribeScope);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import io.sermant.core.utils.StringUtils;
import io.sermant.mq.grayscale.rocketmq.service.RocketMqConsumerGroupAutoCheck;
import io.sermant.mq.grayscale.rocketmq.utils.RocketMqGrayscaleConfigUtils;
import io.sermant.mq.grayscale.rocketmq.utils.RocketMqReflectUtils;
import io.sermant.mq.grayscale.rocketmq.utils.RocketMqSubscriptionDataUtils;

import org.apache.rocketmq.client.impl.consumer.RebalanceImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;

import java.util.concurrent.ConcurrentMap;

Expand All @@ -39,23 +39,24 @@ public class RocketMqSchedulerRebuildSubscriptionInterceptor extends RocketMqAbs

@Override
public ExecuteContext doAfter(ExecuteContext context) throws Exception {
ConcurrentMap<String, SubscriptionData> map = (ConcurrentMap<String, SubscriptionData>) context.getResult();
ConcurrentMap<String, Object> map = (ConcurrentMap<String, Object>) context.getResult();
RebalanceImpl balance = (RebalanceImpl) context.getObject();
if (balance.getConsumerGroup() == null) {
return context;
}
for (SubscriptionData subscriptionData : map.values()) {
if (RocketMqSubscriptionDataUtils.isExpressionTypeInaccurate(subscriptionData.getExpressionType())) {
for (Object subscriptionData : map.values()) {
if (RocketMqSubscriptionDataUtils
.isExpressionTypeInaccurate(RocketMqReflectUtils.getExpressionType(subscriptionData))) {
continue;
}
buildSql92SubscriptionData(subscriptionData, balance);
}
return context;
}

private void buildSql92SubscriptionData(SubscriptionData subscriptionData, RebalanceImpl balance) {
private void buildSql92SubscriptionData(Object subscriptionData, RebalanceImpl balance) {
synchronized (lock) {
String topic = subscriptionData.getTopic();
String topic = RocketMqReflectUtils.getTopic(subscriptionData);
if (!RocketMqSubscriptionDataUtils.getGrayTagChangeFlag(topic, balance)) {
return;
}
Expand All @@ -74,7 +75,7 @@ private void buildSql92SubscriptionData(SubscriptionData subscriptionData, Rebal
}
}

private void resetsSql92SubscriptionData(String topic, String consumerGroup, SubscriptionData subscriptionData,
private void resetsSql92SubscriptionData(String topic, String consumerGroup, Object subscriptionData,
String namesrvAddr) {
String subscribeScope = RocketMqSubscriptionDataUtils.buildSubscribeScope(topic, consumerGroup,
namesrvAddr);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.mq.grayscale.rocketmq.interceptor.client51;

import io.sermant.core.plugin.agent.entity.ExecuteContext;
import io.sermant.core.plugin.agent.interceptor.AbstractInterceptor;
import io.sermant.core.utils.StringUtils;
import io.sermant.mq.grayscale.config.MqGrayConfigCache;
import io.sermant.mq.grayscale.rocketmq.utils.RocketMqGrayscaleConfigUtils;

/**
* upper 5.1.x DefaultMQPushConsumer/DefaultLitePullConsumer/DefaultMQPullConsumer Constructor method interceptor
* gray scene reset consumerGroup with grayGroupTag
*
* @author chengyouling
* @since 2024-05-27
**/
public class RocketMqV51ConsumerConstructorInterceptor extends AbstractInterceptor {
@Override
public ExecuteContext before(ExecuteContext context) throws Exception {
if (!MqGrayConfigCache.getCacheConfig().isEnabled()) {
return context;
}
String grayGroupTag = RocketMqGrayscaleConfigUtils.getGrayGroupTag();
if (StringUtils.isEmpty(grayGroupTag)) {
return context;
}
String originGroup = (String) context.getArguments()[0];
context.getArguments()[0]
= originGroup.contains("_" + grayGroupTag) ? originGroup : originGroup + "_" + grayGroupTag;
return context;
}

@Override
public ExecuteContext after(ExecuteContext context) throws Exception {
return context;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,14 @@
import io.sermant.mq.grayscale.config.MqGrayConfigCache;
import io.sermant.mq.grayscale.rocketmq.config.RocketMqConsumerClientConfig;
import io.sermant.mq.grayscale.rocketmq.utils.RocketMqGrayscaleConfigUtils;
import io.sermant.mq.grayscale.rocketmq.utils.RocketMqReflectUtils;
import io.sermant.mq.grayscale.rocketmq.utils.RocketMqSubscriptionDataUtils;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
Expand Down Expand Up @@ -158,22 +156,22 @@ private static Set<String> findGrayConsumerGroupAndGetTags(RocketMqConsumerClien
try {
MQClientAPIImpl mqClientApi = clientConfig.getMqClientInstance().getMQClientAPIImpl();
String brokerAddress = getBrokerAddress(clientConfig.getTopic(), mqClientApi);
GroupList groupList = mqClientApi.queryTopicConsumeByWho(brokerAddress, clientConfig.getTopic(),
ROCKET_MQ_READ_TIMEOUT);
Object groupList = RocketMqReflectUtils.queryTopicConsumeByWho(mqClientApi, brokerAddress,
clientConfig.getTopic(), ROCKET_MQ_READ_TIMEOUT);
return getGrayTagsByConsumerGroup(groupList, brokerAddress, mqClientApi,
clientConfig.getConsumerGroup());
} catch (MQClientException | InterruptedException | RemotingTimeoutException | RemotingSendRequestException
| RemotingConnectException | MQBrokerException e) {
| RemotingConnectException e) {
LOGGER.log(Level.FINE, String.format(Locale.ENGLISH, "[auto-check] error, message: %s",
e.getMessage()), e);
}
return new HashSet<>();
}

private static Set<String> getGrayTagsByConsumerGroup(GroupList groupList, String brokerAddress,
private static Set<String> getGrayTagsByConsumerGroup(Object groupList, String brokerAddress,
MQClientAPIImpl mqClientApi, String consumerGroup) {
Set<String> grayTags = new HashSet<>();
for (String group : groupList.getGroupList()) {
for (String group : RocketMqReflectUtils.getGroupList(groupList)) {
try {
List<String> consumerIds = mqClientApi.getConsumerIdListByGroup(brokerAddress, group,
ROCKET_MQ_READ_TIMEOUT);
Expand All @@ -196,11 +194,11 @@ private static Set<String> getGrayTagsByConsumerGroup(GroupList groupList, Strin
private static String getBrokerAddress(String topic, MQClientAPIImpl mqClientApi)
throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException,
InterruptedException, MQClientException {
TopicRouteData topicRouteData = mqClientApi.getTopicRouteInfoFromNameServer(topic, ROCKET_MQ_READ_TIMEOUT,
false);
Object topicRouteData = RocketMqReflectUtils.getTopicRouteInfoFromNameServer(mqClientApi, topic,
ROCKET_MQ_READ_TIMEOUT, false);
List<String> brokerList = new ArrayList<>();
for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
brokerList.addAll(brokerData.getBrokerAddrs().values());
for (Object brokerData : RocketMqReflectUtils.getBrokerDatas(topicRouteData)) {
brokerList.addAll(RocketMqReflectUtils.getBrokerAddrs(brokerData).values());
}

// cluster mode has multiple addresses, just select one
Expand Down
Loading

0 comments on commit 6aae697

Please sign in to comment.