From 0f215c0dc4c7628f97a6f50e1213e32d7bf2315e Mon Sep 17 00:00:00 2001 From: wanghuaiyuan Date: Wed, 20 Nov 2024 13:12:58 +0800 Subject: [PATCH 1/2] Remove unnecessary network communication when rebalance in POP --- .../client/impl/consumer/RebalanceImpl.java | 48 +------------------ 1 file changed, 1 insertion(+), 47 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java index d1f0d116e05..1009b0b1d7b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java @@ -241,7 +241,7 @@ public boolean doRebalance(final boolean isOrder) { for (final Map.Entry entry : subTable.entrySet()) { final String topic = entry.getKey(); try { - if (!clientRebalance(topic) && tryQueryAssignment(topic)) { + if (!clientRebalance(topic)) { boolean result = this.getRebalanceResultFromBroker(topic, isOrder); if (!result) { balanced = false; @@ -266,38 +266,6 @@ public boolean doRebalance(final boolean isOrder) { return balanced; } - private boolean tryQueryAssignment(String topic) { - if (topicClientRebalance.containsKey(topic)) { - return false; - } - - if (topicBrokerRebalance.containsKey(topic)) { - return true; - } - String strategyName = allocateMessageQueueStrategy != null ? allocateMessageQueueStrategy.getName() : null; - int retryTimes = 0; - while (retryTimes++ < TIMEOUT_CHECK_TIMES) { - try { - Set resultSet = mQClientFactory.queryAssignment(topic, consumerGroup, - strategyName, messageModel, QUERY_ASSIGNMENT_TIMEOUT / TIMEOUT_CHECK_TIMES * retryTimes); - topicBrokerRebalance.put(topic, topic); - return true; - } catch (Throwable t) { - if (!(t instanceof RemotingTimeoutException)) { - log.error("tryQueryAssignment error.", t); - topicClientRebalance.put(topic, topic); - return false; - } - } - } - if (retryTimes >= TIMEOUT_CHECK_TIMES) { - // if never success before and timeout exceed TIMEOUT_CHECK_TIMES, force client rebalance - topicClientRebalance.put(topic, topic); - return false; - } - return true; - } - public ConcurrentMap getSubscriptionInner() { return subscriptionInner; } @@ -460,20 +428,6 @@ private void truncateMessageQueueNotMyTopic() { } } } - - Iterator> clientIter = topicClientRebalance.entrySet().iterator(); - while (clientIter.hasNext()) { - if (!subTable.containsKey(clientIter.next().getKey())) { - clientIter.remove(); - } - } - - Iterator> brokerIter = topicBrokerRebalance.entrySet().iterator(); - while (brokerIter.hasNext()) { - if (!subTable.containsKey(brokerIter.next().getKey())) { - brokerIter.remove(); - } - } } private boolean updateProcessQueueTableInRebalance(final String topic, final Set mqSet, From 3cc196554ce90e0301cd9bd971842c09f53a976d Mon Sep 17 00:00:00 2001 From: wanghuaiyuan Date: Wed, 20 Nov 2024 13:13:45 +0800 Subject: [PATCH 2/2] Remove unnecessary network communication when rebalance in POP --- .../apache/rocketmq/client/impl/consumer/RebalanceImpl.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java index 1009b0b1d7b..b6f1d99b1c7 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java @@ -36,7 +36,6 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueueAssignment; import org.apache.rocketmq.common.message.MessageRequestMode; -import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody; import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody; import org.apache.rocketmq.remoting.protocol.filter.FilterAPI; @@ -60,12 +59,8 @@ public abstract class RebalanceImpl { protected MessageModel messageModel; protected AllocateMessageQueueStrategy allocateMessageQueueStrategy; protected MQClientInstance mQClientFactory; - private static final int TIMEOUT_CHECK_TIMES = 3; private static final int QUERY_ASSIGNMENT_TIMEOUT = 3000; - private Map topicBrokerRebalance = new ConcurrentHashMap<>(); - private Map topicClientRebalance = new ConcurrentHashMap<>(); - public RebalanceImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy, MQClientInstance mQClientFactory) {