diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 67c9da0bbd87..c507873514cf 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -68,8 +68,6 @@ import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody; import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody; import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; -import org.apache.rocketmq.common.protocol.body.ResumeCheckHalfMessageResult; -import org.apache.rocketmq.common.protocol.body.ResumeResult; import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody; import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader; @@ -128,9 +126,7 @@ import org.apache.rocketmq.store.SelectMappedBufferResult; public class AdminBrokerProcessor implements NettyRequestProcessor { - - private static final InternalLogger log = InternalLoggerFactory - .getLogger(LoggerName.BROKER_LOGGER_NAME); + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; public AdminBrokerProcessor(final BrokerController brokerController) { @@ -139,7 +135,7 @@ public AdminBrokerProcessor(final BrokerController brokerController) { @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { + RemotingCommand request) throws RemotingCommandException { switch (request.getCode()) { case RequestCode.UPDATE_AND_CREATE_TOPIC: return this.updateAndCreateTopic(ctx, request); @@ -228,18 +224,14 @@ public boolean rejectRequest() { } private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final CreateTopicRequestHeader requestHeader = - (CreateTopicRequestHeader) request - .decodeCommandCustomHeader(CreateTopicRequestHeader.class); - log.info("updateAndCreateTopic called by {}", - RemotingHelper.parseChannelRemoteAddr(ctx.channel())); - - if (requestHeader.getTopic() - .equals(this.brokerController.getBrokerConfig().getBrokerClusterName())) { - String errorMsg = "the topic[" + requestHeader.getTopic() - + "] is conflict with system reserved words."; + (CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class); + log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + + if (requestHeader.getTopic().equals(this.brokerController.getBrokerConfig().getBrokerClusterName())) { + String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words."; log.warn(errorMsg); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(errorMsg); @@ -261,31 +253,26 @@ private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums()); topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum()); topicConfig.setPerm(requestHeader.getPerm()); - topicConfig.setTopicSysFlag( - requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag()); + topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag()); this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); - this.brokerController.registerIncrementBrokerData(topicConfig, - this.brokerController.getTopicConfigManager().getDataVersion()); + this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion()); return null; } private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); DeleteTopicRequestHeader requestHeader = - (DeleteTopicRequestHeader) request - .decodeCommandCustomHeader(DeleteTopicRequestHeader.class); + (DeleteTopicRequestHeader) request.decodeCommandCustomHeader(DeleteTopicRequestHeader.class); log.info("deleteTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic()); this.brokerController.getMessageStore() - .cleanUnusedTopic( - this.brokerController.getTopicConfigManager().getTopicConfigTable() - .keySet()); + .cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet()); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); @@ -293,8 +280,7 @@ private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx, } private RemotingCommand getAllTopicConfig(ChannelHandlerContext ctx, RemotingCommand request) { - final RemotingCommand response = RemotingCommand - .createResponseCommand(GetAllTopicConfigResponseHeader.class); + final RemotingCommand response = RemotingCommand.createResponseCommand(GetAllTopicConfigResponseHeader.class); // final GetAllTopicConfigResponseHeader responseHeader = // (GetAllTopicConfigResponseHeader) response.readCustomHeader(); @@ -322,12 +308,10 @@ private RemotingCommand getAllTopicConfig(ChannelHandlerContext ctx, RemotingCom return response; } - private synchronized RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx, - RemotingCommand request) { + private synchronized RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); - log.info("updateBrokerConfig called by {}", - RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + log.info("updateBrokerConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); byte[] body = request.getBody(); if (body != null) { @@ -335,12 +319,10 @@ private synchronized RemotingCommand updateBrokerConfig(ChannelHandlerContext ct String bodyStr = new String(body, MixAll.DEFAULT_CHARSET); Properties properties = MixAll.string2Properties(bodyStr); if (properties != null) { - log.info("updateBrokerConfig, new config: [{}] client: {} ", properties, - ctx.channel().remoteAddress()); + log.info("updateBrokerConfig, new config: [{}] client: {} ", properties, ctx.channel().remoteAddress()); this.brokerController.getConfiguration().update(properties); if (properties.containsKey("brokerPermission")) { - this.brokerController.getTopicConfigManager().getDataVersion() - .nextVersion(); + this.brokerController.getTopicConfigManager().getDataVersion().nextVersion(); this.brokerController.registerBrokerAll(false, false, true); } } else { @@ -364,10 +346,8 @@ private synchronized RemotingCommand updateBrokerConfig(ChannelHandlerContext ct private RemotingCommand getBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) { - final RemotingCommand response = RemotingCommand - .createResponseCommand(GetBrokerConfigResponseHeader.class); - final GetBrokerConfigResponseHeader responseHeader = (GetBrokerConfigResponseHeader) response - .readCustomHeader(); + final RemotingCommand response = RemotingCommand.createResponseCommand(GetBrokerConfigResponseHeader.class); + final GetBrokerConfigResponseHeader responseHeader = (GetBrokerConfigResponseHeader) response.readCustomHeader(); String content = this.brokerController.getConfiguration().getAllConfigsFormatString(); if (content != null && content.length() > 0) { @@ -390,18 +370,14 @@ private RemotingCommand getBrokerConfig(ChannelHandlerContext ctx, RemotingComma } private RemotingCommand searchOffsetByTimestamp(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand - .createResponseCommand(SearchOffsetResponseHeader.class); - final SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader) response - .readCustomHeader(); + RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class); + final SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader) response.readCustomHeader(); final SearchOffsetRequestHeader requestHeader = - (SearchOffsetRequestHeader) request - .decodeCommandCustomHeader(SearchOffsetRequestHeader.class); + (SearchOffsetRequestHeader) request.decodeCommandCustomHeader(SearchOffsetRequestHeader.class); - long offset = this.brokerController.getMessageStore() - .getOffsetInQueueByTime(requestHeader.getTopic(), requestHeader.getQueueId(), - requestHeader.getTimestamp()); + long offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(requestHeader.getTopic(), requestHeader.getQueueId(), + requestHeader.getTimestamp()); responseHeader.setOffset(offset); @@ -411,17 +387,13 @@ private RemotingCommand searchOffsetByTimestamp(ChannelHandlerContext ctx, } private RemotingCommand getMaxOffset(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand - .createResponseCommand(GetMaxOffsetResponseHeader.class); - final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response - .readCustomHeader(); + RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class); + final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader(); final GetMaxOffsetRequestHeader requestHeader = - (GetMaxOffsetRequestHeader) request - .decodeCommandCustomHeader(GetMaxOffsetRequestHeader.class); + (GetMaxOffsetRequestHeader) request.decodeCommandCustomHeader(GetMaxOffsetRequestHeader.class); - long offset = this.brokerController.getMessageStore() - .getMaxOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId()); + long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId()); responseHeader.setOffset(offset); @@ -431,17 +403,13 @@ private RemotingCommand getMaxOffset(ChannelHandlerContext ctx, } private RemotingCommand getMinOffset(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand - .createResponseCommand(GetMinOffsetResponseHeader.class); - final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response - .readCustomHeader(); + RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class); + final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader(); final GetMinOffsetRequestHeader requestHeader = - (GetMinOffsetRequestHeader) request - .decodeCommandCustomHeader(GetMinOffsetRequestHeader.class); + (GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class); - long offset = this.brokerController.getMessageStore() - .getMinOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId()); + long offset = this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId()); responseHeader.setOffset(offset); response.setCode(ResponseCode.SUCCESS); @@ -450,19 +418,14 @@ private RemotingCommand getMinOffset(ChannelHandlerContext ctx, } private RemotingCommand getEarliestMsgStoretime(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand - .createResponseCommand(GetEarliestMsgStoretimeResponseHeader.class); - final GetEarliestMsgStoretimeResponseHeader responseHeader = (GetEarliestMsgStoretimeResponseHeader) response - .readCustomHeader(); + RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(GetEarliestMsgStoretimeResponseHeader.class); + final GetEarliestMsgStoretimeResponseHeader responseHeader = (GetEarliestMsgStoretimeResponseHeader) response.readCustomHeader(); final GetEarliestMsgStoretimeRequestHeader requestHeader = - (GetEarliestMsgStoretimeRequestHeader) request - .decodeCommandCustomHeader(GetEarliestMsgStoretimeRequestHeader.class); + (GetEarliestMsgStoretimeRequestHeader) request.decodeCommandCustomHeader(GetEarliestMsgStoretimeRequestHeader.class); long timestamp = - this.brokerController.getMessageStore() - .getEarliestMessageTime(requestHeader.getTopic(), - requestHeader.getQueueId()); + this.brokerController.getMessageStore().getEarliestMessageTime(requestHeader.getTopic(), requestHeader.getQueueId()); responseHeader.setTimestamp(timestamp); response.setCode(ResponseCode.SUCCESS); @@ -470,8 +433,7 @@ private RemotingCommand getEarliestMsgStoretime(ChannelHandlerContext ctx, return response; } - private RemotingCommand getBrokerRuntimeInfo(ChannelHandlerContext ctx, - RemotingCommand request) { + private RemotingCommand getBrokerRuntimeInfo(ChannelHandlerContext ctx, RemotingCommand request) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); HashMap runtimeInfo = this.prepareRuntimeInfo(); @@ -486,16 +448,14 @@ private RemotingCommand getBrokerRuntimeInfo(ChannelHandlerContext ctx, } private RemotingCommand lockBatchMQ(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); - LockBatchRequestBody requestBody = LockBatchRequestBody - .decode(request.getBody(), LockBatchRequestBody.class); + LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class); - Set lockOKMQSet = this.brokerController.getRebalanceLockManager() - .tryLockBatch( - requestBody.getConsumerGroup(), - requestBody.getMqSet(), - requestBody.getClientId()); + Set lockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch( + requestBody.getConsumerGroup(), + requestBody.getMqSet(), + requestBody.getClientId()); LockBatchResponseBody responseBody = new LockBatchResponseBody(); responseBody.setLockOKMQSet(lockOKMQSet); @@ -507,34 +467,29 @@ private RemotingCommand lockBatchMQ(ChannelHandlerContext ctx, } private RemotingCommand unlockBatchMQ(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); - UnlockBatchRequestBody requestBody = UnlockBatchRequestBody - .decode(request.getBody(), UnlockBatchRequestBody.class); + UnlockBatchRequestBody requestBody = UnlockBatchRequestBody.decode(request.getBody(), UnlockBatchRequestBody.class); this.brokerController.getRebalanceLockManager().unlockBatch( - requestBody.getConsumerGroup(), - requestBody.getMqSet(), - requestBody.getClientId()); + requestBody.getConsumerGroup(), + requestBody.getMqSet(), + requestBody.getClientId()); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } - private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext ctx, - RemotingCommand request) - throws RemotingCommandException { + private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request) + throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); - log.info("updateAndCreateSubscriptionGroup called by {}", - RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + log.info("updateAndCreateSubscriptionGroup called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); - SubscriptionGroupConfig config = RemotingSerializable - .decode(request.getBody(), SubscriptionGroupConfig.class); + SubscriptionGroupConfig config = RemotingSerializable.decode(request.getBody(), SubscriptionGroupConfig.class); if (config != null) { - this.brokerController.getSubscriptionGroupManager() - .updateSubscriptionGroupConfig(config); + this.brokerController.getSubscriptionGroupManager().updateSubscriptionGroupConfig(config); } response.setCode(ResponseCode.SUCCESS); @@ -543,7 +498,7 @@ private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext c } private RemotingCommand getAllSubscriptionGroup(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); String content = this.brokerController.getSubscriptionGroupManager().encode(); if (content != null && content.length() > 0) { @@ -557,8 +512,7 @@ private RemotingCommand getAllSubscriptionGroup(ChannelHandlerContext ctx, return response; } } else { - log.error("No subscription group in this broker, client:{} ", - ctx.channel().remoteAddress()); + log.error("No subscription group in this broker, client:{} ", ctx.channel().remoteAddress()); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("No subscription group in this broker"); return response; @@ -571,17 +525,14 @@ private RemotingCommand getAllSubscriptionGroup(ChannelHandlerContext ctx, } private RemotingCommand deleteSubscriptionGroup(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); DeleteSubscriptionGroupRequestHeader requestHeader = - (DeleteSubscriptionGroupRequestHeader) request - .decodeCommandCustomHeader(DeleteSubscriptionGroupRequestHeader.class); + (DeleteSubscriptionGroupRequestHeader) request.decodeCommandCustomHeader(DeleteSubscriptionGroupRequestHeader.class); - log.info("deleteSubscriptionGroup called by {}", - RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + log.info("deleteSubscriptionGroup called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); - this.brokerController.getSubscriptionGroupManager() - .deleteSubscriptionGroupConfig(requestHeader.getGroupName()); + this.brokerController.getSubscriptionGroupManager().deleteSubscriptionGroupConfig(requestHeader.getGroupName()); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); @@ -589,15 +540,13 @@ private RemotingCommand deleteSubscriptionGroup(ChannelHandlerContext ctx, } private RemotingCommand getTopicStatsInfo(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetTopicStatsInfoRequestHeader requestHeader = - (GetTopicStatsInfoRequestHeader) request - .decodeCommandCustomHeader(GetTopicStatsInfoRequestHeader.class); + (GetTopicStatsInfoRequestHeader) request.decodeCommandCustomHeader(GetTopicStatsInfoRequestHeader.class); final String topic = requestHeader.getTopic(); - TopicConfig topicConfig = this.brokerController.getTopicConfigManager() - .selectTopicConfig(topic); + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); if (null == topicConfig) { response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark("topic[" + topic + "] not exist"); @@ -613,19 +562,16 @@ private RemotingCommand getTopicStatsInfo(ChannelHandlerContext ctx, TopicOffset topicOffset = new TopicOffset(); long min = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, i); - if (min < 0) { + if (min < 0) min = 0; - } long max = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i); - if (max < 0) { + if (max < 0) max = 0; - } long timestamp = 0; if (max > 0) { - timestamp = this.brokerController.getMessageStore() - .getMessageStoreTimeStamp(topic, i, max - 1); + timestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, max - 1); } topicOffset.setMinOffset(min); @@ -643,15 +589,13 @@ private RemotingCommand getTopicStatsInfo(ChannelHandlerContext ctx, } private RemotingCommand getConsumerConnectionList(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetConsumerConnectionListRequestHeader requestHeader = - (GetConsumerConnectionListRequestHeader) request - .decodeCommandCustomHeader(GetConsumerConnectionListRequestHeader.class); + (GetConsumerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetConsumerConnectionListRequestHeader.class); ConsumerGroupInfo consumerGroupInfo = - this.brokerController.getConsumerManager() - .getConsumerGroupInfo(requestHeader.getConsumerGroup()); + this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup()); if (consumerGroupInfo != null) { ConsumerConnection bodydata = new ConsumerConnection(); bodydata.setConsumeFromWhere(consumerGroupInfo.getConsumeFromWhere()); @@ -659,8 +603,7 @@ private RemotingCommand getConsumerConnectionList(ChannelHandlerContext ctx, bodydata.setMessageModel(consumerGroupInfo.getMessageModel()); bodydata.getSubscriptionTable().putAll(consumerGroupInfo.getSubscriptionTable()); - Iterator> it = consumerGroupInfo - .getChannelInfoTable().entrySet().iterator(); + Iterator> it = consumerGroupInfo.getChannelInfoTable().entrySet().iterator(); while (it.hasNext()) { ClientChannelInfo info = it.next().getValue(); Connection connection = new Connection(); @@ -681,25 +624,21 @@ private RemotingCommand getConsumerConnectionList(ChannelHandlerContext ctx, } response.setCode(ResponseCode.CONSUMER_NOT_ONLINE); - response.setRemark( - "the consumer group[" + requestHeader.getConsumerGroup() + "] not online"); + response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] not online"); return response; } private RemotingCommand getProducerConnectionList(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetProducerConnectionListRequestHeader requestHeader = - (GetProducerConnectionListRequestHeader) request - .decodeCommandCustomHeader(GetProducerConnectionListRequestHeader.class); + (GetProducerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetProducerConnectionListRequestHeader.class); ProducerConnection bodydata = new ProducerConnection(); HashMap channelInfoHashMap = - this.brokerController.getProducerManager().getGroupChannelTable() - .get(requestHeader.getProducerGroup()); + this.brokerController.getProducerManager().getGroupChannelTable().get(requestHeader.getProducerGroup()); if (channelInfoHashMap != null) { - Iterator> it = channelInfoHashMap.entrySet() - .iterator(); + Iterator> it = channelInfoHashMap.entrySet().iterator(); while (it.hasNext()) { ClientChannelInfo info = it.next().getValue(); Connection connection = new Connection(); @@ -719,31 +658,27 @@ private RemotingCommand getProducerConnectionList(ChannelHandlerContext ctx, } response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark( - "the producer group[" + requestHeader.getProducerGroup() + "] not exist"); + response.setRemark("the producer group[" + requestHeader.getProducerGroup() + "] not exist"); return response; } private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetConsumeStatsRequestHeader requestHeader = - (GetConsumeStatsRequestHeader) request - .decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class); + (GetConsumeStatsRequestHeader) request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class); ConsumeStats consumeStats = new ConsumeStats(); Set topics = new HashSet(); if (UtilAll.isBlank(requestHeader.getTopic())) { - topics = this.brokerController.getConsumerOffsetManager() - .whichTopicByConsumer(requestHeader.getConsumerGroup()); + topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getConsumerGroup()); } else { topics.add(requestHeader.getTopic()); } for (String topic : topics) { - TopicConfig topicConfig = this.brokerController.getTopicConfigManager() - .selectTopicConfig(topic); + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); if (null == topicConfig) { log.warn("consumeStats, topic config not exist, {}", topic); continue; @@ -751,14 +686,11 @@ private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, { SubscriptionData findSubscriptionData = - this.brokerController.getConsumerManager() - .findSubscriptionData(requestHeader.getConsumerGroup(), topic); + this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic); if (null == findSubscriptionData - && this.brokerController.getConsumerManager() - .findSubscriptionDataCount(requestHeader.getConsumerGroup()) > 0) { - log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", - requestHeader.getConsumerGroup(), topic); + && this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup()) > 0) { + log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", requestHeader.getConsumerGroup(), topic); continue; } } @@ -771,27 +703,23 @@ private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, OffsetWrapper offsetWrapper = new OffsetWrapper(); - long brokerOffset = this.brokerController.getMessageStore() - .getMaxOffsetInQueue(topic, i); - if (brokerOffset < 0) { + long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i); + if (brokerOffset < 0) brokerOffset = 0; - } long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset( - requestHeader.getConsumerGroup(), - topic, - i); - if (consumerOffset < 0) { + requestHeader.getConsumerGroup(), + topic, + i); + if (consumerOffset < 0) consumerOffset = 0; - } offsetWrapper.setBrokerOffset(brokerOffset); offsetWrapper.setConsumerOffset(consumerOffset); long timeOffset = consumerOffset - 1; if (timeOffset >= 0) { - long lastTimestamp = this.brokerController.getMessageStore() - .getMessageStoreTimeStamp(topic, i, timeOffset); + long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset); if (lastTimestamp > 0) { offsetWrapper.setLastTimestamp(lastTimestamp); } @@ -800,8 +728,7 @@ private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, consumeStats.getOffsetTable().put(mq, offsetWrapper); } - double consumeTps = this.brokerController.getBrokerStatsManager() - .tpsGroupGetNums(requestHeader.getConsumerGroup(), topic); + double consumeTps = this.brokerController.getBrokerStatsManager().tpsGroupGetNums(requestHeader.getConsumerGroup(), topic); consumeTps += consumeStats.getConsumeTps(); consumeStats.setConsumeTps(consumeTps); @@ -814,8 +741,7 @@ private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, return response; } - private RemotingCommand getAllConsumerOffset(ChannelHandlerContext ctx, - RemotingCommand request) { + private RemotingCommand getAllConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); String content = this.brokerController.getConsumerOffsetManager().encode(); @@ -830,8 +756,7 @@ private RemotingCommand getAllConsumerOffset(ChannelHandlerContext ctx, return response; } } else { - log.error("No consumer offset in this broker, client: {} ", - ctx.channel().remoteAddress()); + log.error("No consumer offset in this broker, client: {} ", ctx.channel().remoteAddress()); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("No consumer offset in this broker"); return response; @@ -847,15 +772,13 @@ private RemotingCommand getAllDelayOffset(ChannelHandlerContext ctx, RemotingCom final RemotingCommand response = RemotingCommand.createResponseCommand(null); if (!(this.brokerController.getMessageStore() instanceof DefaultMessageStore)) { - log.error("Delay offset not supported in this messagetore, client: {} ", - ctx.channel().remoteAddress()); + log.error("Delay offset not supported in this messagetore, client: {} ", ctx.channel().remoteAddress()); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("Delay offset not supported in this messagetore"); return response; } - String content = ((DefaultMessageStore) this.brokerController.getMessageStore()) - .getScheduleMessageService().encode(); + String content = ((DefaultMessageStore) this.brokerController.getMessageStore()).getScheduleMessageService().encode(); if (content != null && content.length() > 0) { try { response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); @@ -880,15 +803,12 @@ private RemotingCommand getAllDelayOffset(ChannelHandlerContext ctx, RemotingCom } public RemotingCommand resetOffset(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { + RemotingCommand request) throws RemotingCommandException { final ResetOffsetRequestHeader requestHeader = - (ResetOffsetRequestHeader) request - .decodeCommandCustomHeader(ResetOffsetRequestHeader.class); - log.info( - "[reset-offset] reset offset started by {}. topic={}, group={}, timestamp={}, isForce={}", - RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), - requestHeader.getGroup(), - requestHeader.getTimestamp(), requestHeader.isForce()); + (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class); + log.info("[reset-offset] reset offset started by {}. topic={}, group={}, timestamp={}, isForce={}", + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(), + requestHeader.getTimestamp(), requestHeader.isForce()); boolean isC = false; LanguageCode language = request.getLanguage(); switch (language) { @@ -896,38 +816,31 @@ public RemotingCommand resetOffset(ChannelHandlerContext ctx, isC = true; break; } - return this.brokerController.getBroker2Client() - .resetOffset(requestHeader.getTopic(), requestHeader.getGroup(), - requestHeader.getTimestamp(), requestHeader.isForce(), isC); + return this.brokerController.getBroker2Client().resetOffset(requestHeader.getTopic(), requestHeader.getGroup(), + requestHeader.getTimestamp(), requestHeader.isForce(), isC); } public RemotingCommand getConsumerStatus(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { + RemotingCommand request) throws RemotingCommandException { final GetConsumerStatusRequestHeader requestHeader = - (GetConsumerStatusRequestHeader) request - .decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class); + (GetConsumerStatusRequestHeader) request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class); log.info("[get-consumer-status] get consumer status by {}. topic={}, group={}", - RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), - requestHeader.getGroup()); + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup()); - return this.brokerController.getBroker2Client() - .getConsumeStatus(requestHeader.getTopic(), requestHeader.getGroup(), - requestHeader.getClientAddr()); + return this.brokerController.getBroker2Client().getConsumeStatus(requestHeader.getTopic(), requestHeader.getGroup(), + requestHeader.getClientAddr()); } private RemotingCommand queryTopicConsumeByWho(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); QueryTopicConsumeByWhoRequestHeader requestHeader = - (QueryTopicConsumeByWhoRequestHeader) request - .decodeCommandCustomHeader(QueryTopicConsumeByWhoRequestHeader.class); + (QueryTopicConsumeByWhoRequestHeader) request.decodeCommandCustomHeader(QueryTopicConsumeByWhoRequestHeader.class); - HashSet groups = this.brokerController.getConsumerManager() - .queryTopicConsumeByWho(requestHeader.getTopic()); + HashSet groups = this.brokerController.getConsumerManager().queryTopicConsumeByWho(requestHeader.getTopic()); - Set groupInOffset = this.brokerController.getConsumerOffsetManager() - .whichGroupByTopic(requestHeader.getTopic()); + Set groupInOffset = this.brokerController.getConsumerOffsetManager().whichGroupByTopic(requestHeader.getTopic()); if (groupInOffset != null && !groupInOffset.isEmpty()) { groups.addAll(groupInOffset); } @@ -943,17 +856,13 @@ private RemotingCommand queryTopicConsumeByWho(ChannelHandlerContext ctx, } private RemotingCommand registerFilterServer(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand - .createResponseCommand(RegisterFilterServerResponseHeader.class); - final RegisterFilterServerResponseHeader responseHeader = (RegisterFilterServerResponseHeader) response - .readCustomHeader(); + RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterFilterServerResponseHeader.class); + final RegisterFilterServerResponseHeader responseHeader = (RegisterFilterServerResponseHeader) response.readCustomHeader(); final RegisterFilterServerRequestHeader requestHeader = - (RegisterFilterServerRequestHeader) request - .decodeCommandCustomHeader(RegisterFilterServerRequestHeader.class); + (RegisterFilterServerRequestHeader) request.decodeCommandCustomHeader(RegisterFilterServerRequestHeader.class); - this.brokerController.getFilterServerManager() - .registerFilterServer(ctx.channel(), requestHeader.getFilterServerAddr()); + this.brokerController.getFilterServerManager().registerFilterServer(ctx.channel(), requestHeader.getFilterServerAddr()); responseHeader.setBrokerId(this.brokerController.getBrokerConfig().getBrokerId()); responseHeader.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); @@ -964,15 +873,13 @@ private RemotingCommand registerFilterServer(ChannelHandlerContext ctx, } private RemotingCommand queryConsumeTimeSpan(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); QueryConsumeTimeSpanRequestHeader requestHeader = - (QueryConsumeTimeSpanRequestHeader) request - .decodeCommandCustomHeader(QueryConsumeTimeSpanRequestHeader.class); + (QueryConsumeTimeSpanRequestHeader) request.decodeCommandCustomHeader(QueryConsumeTimeSpanRequestHeader.class); final String topic = requestHeader.getTopic(); - TopicConfig topicConfig = this.brokerController.getTopicConfigManager() - .selectTopicConfig(topic); + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); if (null == topicConfig) { response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark("topic[" + topic + "] not exist"); @@ -992,26 +899,22 @@ private RemotingCommand queryConsumeTimeSpan(ChannelHandlerContext ctx, timeSpan.setMinTimeStamp(minTime); long max = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i); - long maxTime = this.brokerController.getMessageStore() - .getMessageStoreTimeStamp(topic, i, max - 1); + long maxTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, max - 1); timeSpan.setMaxTimeStamp(maxTime); long consumeTime; long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset( - requestHeader.getGroup(), topic, i); + requestHeader.getGroup(), topic, i); if (consumerOffset > 0) { - consumeTime = this.brokerController.getMessageStore() - .getMessageStoreTimeStamp(topic, i, consumerOffset - 1); + consumeTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, consumerOffset - 1); } else { consumeTime = minTime; } timeSpan.setConsumeTimeStamp(consumeTime); - long maxBrokerOffset = this.brokerController.getMessageStore() - .getMaxOffsetInQueue(requestHeader.getTopic(), i); + long maxBrokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(), i); if (consumerOffset < maxBrokerOffset) { - long nextTime = this.brokerController.getMessageStore() - .getMessageStoreTimeStamp(topic, i, consumerOffset); + long nextTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, consumerOffset); timeSpan.setDelayTime(System.currentTimeMillis() - nextTime); } timeSpanSet.add(timeSpan); @@ -1025,9 +928,8 @@ private RemotingCommand queryConsumeTimeSpan(ChannelHandlerContext ctx, return response; } - private RemotingCommand getSystemTopicListFromBroker(ChannelHandlerContext ctx, - RemotingCommand request) - throws RemotingCommandException { + private RemotingCommand getSystemTopicListFromBroker(ChannelHandlerContext ctx, RemotingCommand request) + throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); Set topics = this.brokerController.getTopicConfigManager().getSystemTopic(); @@ -1052,8 +954,7 @@ public RemotingCommand cleanExpiredConsumeQueue() { public RemotingCommand cleanUnusedTopic() { log.warn("invoke cleanUnusedTopic start."); final RemotingCommand response = RemotingCommand.createResponseCommand(null); - brokerController.getMessageStore().cleanUnusedTopic( - brokerController.getTopicConfigManager().getTopicConfigTable().keySet()); + brokerController.getMessageStore().cleanUnusedTopic(brokerController.getTopicConfigManager().getTopicConfigTable().keySet()); log.warn("invoke cleanUnusedTopic end."); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); @@ -1061,37 +962,31 @@ public RemotingCommand cleanUnusedTopic() { } private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { + RemotingCommand request) throws RemotingCommandException { final GetConsumerRunningInfoRequestHeader requestHeader = - (GetConsumerRunningInfoRequestHeader) request - .decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class); + (GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class); - return this.callConsumer(RequestCode.GET_CONSUMER_RUNNING_INFO, request, - requestHeader.getConsumerGroup(), - requestHeader.getClientId()); + return this.callConsumer(RequestCode.GET_CONSUMER_RUNNING_INFO, request, requestHeader.getConsumerGroup(), + requestHeader.getClientId()); } private RemotingCommand queryCorrectionOffset(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); QueryCorrectionOffsetHeader requestHeader = - (QueryCorrectionOffsetHeader) request - .decodeCommandCustomHeader(QueryCorrectionOffsetHeader.class); + (QueryCorrectionOffsetHeader) request.decodeCommandCustomHeader(QueryCorrectionOffsetHeader.class); Map correctionOffset = this.brokerController.getConsumerOffsetManager() - .queryMinOffsetInAllGroup(requestHeader.getTopic(), - requestHeader.getFilterGroups()); + .queryMinOffsetInAllGroup(requestHeader.getTopic(), requestHeader.getFilterGroups()); Map compareOffset = - this.brokerController.getConsumerOffsetManager() - .queryOffset(requestHeader.getTopic(), requestHeader.getCompareGroup()); + this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getTopic(), requestHeader.getCompareGroup()); if (compareOffset != null && !compareOffset.isEmpty()) { for (Map.Entry entry : compareOffset.entrySet()) { Integer queueId = entry.getKey(); correctionOffset.put(queueId, - correctionOffset.get(queueId) > entry.getValue() ? Long.MAX_VALUE - : correctionOffset.get(queueId)); + correctionOffset.get(queueId) > entry.getValue() ? Long.MAX_VALUE : correctionOffset.get(queueId)); } } @@ -1104,17 +999,15 @@ private RemotingCommand queryCorrectionOffset(ChannelHandlerContext ctx, } private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { + RemotingCommand request) throws RemotingCommandException { final ConsumeMessageDirectlyResultRequestHeader requestHeader = (ConsumeMessageDirectlyResultRequestHeader) request - .decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class); + .decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class); - request.getExtFields() - .put("brokerName", this.brokerController.getBrokerConfig().getBrokerName()); + request.getExtFields().put("brokerName", this.brokerController.getBrokerConfig().getBrokerName()); SelectMappedBufferResult selectMappedBufferResult = null; try { MessageId messageId = MessageDecoder.decodeMessageId(requestHeader.getMsgId()); - selectMappedBufferResult = this.brokerController.getMessageStore() - .selectOneMessageByOffset(messageId.getOffset()); + selectMappedBufferResult = this.brokerController.getMessageStore().selectOneMessageByOffset(messageId.getOffset()); byte[] body = new byte[selectMappedBufferResult.getSize()]; selectMappedBufferResult.getByteBuffer().get(body); @@ -1126,30 +1019,26 @@ private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, } } - return this.callConsumer(RequestCode.CONSUME_MESSAGE_DIRECTLY, request, - requestHeader.getConsumerGroup(), - requestHeader.getClientId()); + return this.callConsumer(RequestCode.CONSUME_MESSAGE_DIRECTLY, request, requestHeader.getConsumerGroup(), + requestHeader.getClientId()); } private RemotingCommand cloneGroupOffset(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); CloneGroupOffsetRequestHeader requestHeader = - (CloneGroupOffsetRequestHeader) request - .decodeCommandCustomHeader(CloneGroupOffsetRequestHeader.class); + (CloneGroupOffsetRequestHeader) request.decodeCommandCustomHeader(CloneGroupOffsetRequestHeader.class); Set topics; if (UtilAll.isBlank(requestHeader.getTopic())) { - topics = this.brokerController.getConsumerOffsetManager() - .whichTopicByConsumer(requestHeader.getSrcGroup()); + topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getSrcGroup()); } else { topics = new HashSet(); topics.add(requestHeader.getTopic()); } for (String topic : topics) { - TopicConfig topicConfig = this.brokerController.getTopicConfigManager() - .selectTopicConfig(topic); + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); if (null == topicConfig) { log.warn("[cloneGroupOffset], topic config not exist, {}", topic); continue; @@ -1158,20 +1047,16 @@ private RemotingCommand cloneGroupOffset(ChannelHandlerContext ctx, if (!requestHeader.isOffline()) { SubscriptionData findSubscriptionData = - this.brokerController.getConsumerManager() - .findSubscriptionData(requestHeader.getSrcGroup(), topic); - if (this.brokerController.getConsumerManager() - .findSubscriptionDataCount(requestHeader.getSrcGroup()) > 0 - && findSubscriptionData == null) { - log.warn("[cloneGroupOffset], the consumer group[{}], topic[{}] not exist", - requestHeader.getSrcGroup(), topic); + this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getSrcGroup(), topic); + if (this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getSrcGroup()) > 0 + && findSubscriptionData == null) { + log.warn("[cloneGroupOffset], the consumer group[{}], topic[{}] not exist", requestHeader.getSrcGroup(), topic); continue; } } - this.brokerController.getConsumerOffsetManager() - .cloneOffset(requestHeader.getSrcGroup(), requestHeader.getDestGroup(), - requestHeader.getTopic()); + this.brokerController.getConsumerOffsetManager().cloneOffset(requestHeader.getSrcGroup(), requestHeader.getDestGroup(), + requestHeader.getTopic()); } response.setCode(ResponseCode.SUCCESS); @@ -1180,20 +1065,16 @@ private RemotingCommand cloneGroupOffset(ChannelHandlerContext ctx, } private RemotingCommand ViewBrokerStatsData(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { + RemotingCommand request) throws RemotingCommandException { final ViewBrokerStatsDataRequestHeader requestHeader = - (ViewBrokerStatsDataRequestHeader) request - .decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class); + (ViewBrokerStatsDataRequestHeader) request.decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class); final RemotingCommand response = RemotingCommand.createResponseCommand(null); MessageStore messageStore = this.brokerController.getMessageStore(); - StatsItem statsItem = messageStore.getBrokerStatsManager() - .getStatsItem(requestHeader.getStatsName(), requestHeader.getStatsKey()); + StatsItem statsItem = messageStore.getBrokerStatsManager().getStatsItem(requestHeader.getStatsName(), requestHeader.getStatsKey()); if (null == statsItem) { response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark( - String.format("The stats <%s> <%s> not exist", requestHeader.getStatsName(), - requestHeader.getStatsKey())); + response.setRemark(String.format("The stats <%s> <%s> not exist", requestHeader.getStatsName(), requestHeader.getStatsKey())); return response; } @@ -1232,31 +1113,27 @@ private RemotingCommand ViewBrokerStatsData(ChannelHandlerContext ctx, return response; } - private RemotingCommand fetchAllConsumeStatsInBroker(ChannelHandlerContext ctx, - RemotingCommand request) - throws RemotingCommandException { + private RemotingCommand fetchAllConsumeStatsInBroker(ChannelHandlerContext ctx, RemotingCommand request) + throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); GetConsumeStatsInBrokerHeader requestHeader = - (GetConsumeStatsInBrokerHeader) request - .decodeCommandCustomHeader(GetConsumeStatsInBrokerHeader.class); + (GetConsumeStatsInBrokerHeader) request.decodeCommandCustomHeader(GetConsumeStatsInBrokerHeader.class); boolean isOrder = requestHeader.isOrder(); ConcurrentMap subscriptionGroups = - brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable(); + brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable(); List>> brokerConsumeStatsList = - new ArrayList>>(); + new ArrayList>>(); long totalDiff = 0L; for (String group : subscriptionGroups.keySet()) { Map> subscripTopicConsumeMap = new HashMap>(); - Set topics = this.brokerController.getConsumerOffsetManager() - .whichTopicByConsumer(group); + Set topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(group); List consumeStatsList = new ArrayList(); for (String topic : topics) { ConsumeStats consumeStats = new ConsumeStats(); - TopicConfig topicConfig = this.brokerController.getTopicConfigManager() - .selectTopicConfig(topic); + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); if (null == topicConfig) { log.warn("consumeStats, topic config not exist, {}", topic); continue; @@ -1267,14 +1144,11 @@ private RemotingCommand fetchAllConsumeStatsInBroker(ChannelHandlerContext ctx, } { - SubscriptionData findSubscriptionData = this.brokerController - .getConsumerManager().findSubscriptionData(group, topic); + SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(group, topic); if (null == findSubscriptionData - && this.brokerController.getConsumerManager() - .findSubscriptionDataCount(group) > 0) { - log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", group, - topic); + && this.brokerController.getConsumerManager().findSubscriptionDataCount(group) > 0) { + log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", group, topic); continue; } } @@ -1285,35 +1159,29 @@ private RemotingCommand fetchAllConsumeStatsInBroker(ChannelHandlerContext ctx, mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); mq.setQueueId(i); OffsetWrapper offsetWrapper = new OffsetWrapper(); - long brokerOffset = this.brokerController.getMessageStore() - .getMaxOffsetInQueue(topic, i); - if (brokerOffset < 0) { + long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i); + if (brokerOffset < 0) brokerOffset = 0; - } - long consumerOffset = this.brokerController.getConsumerOffsetManager() - .queryOffset( - group, - topic, - i); - if (consumerOffset < 0) { + long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset( + group, + topic, + i); + if (consumerOffset < 0) consumerOffset = 0; - } offsetWrapper.setBrokerOffset(brokerOffset); offsetWrapper.setConsumerOffset(consumerOffset); long timeOffset = consumerOffset - 1; if (timeOffset >= 0) { - long lastTimestamp = this.brokerController.getMessageStore() - .getMessageStoreTimeStamp(topic, i, timeOffset); + long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset); if (lastTimestamp > 0) { offsetWrapper.setLastTimestamp(lastTimestamp); } } consumeStats.getOffsetTable().put(mq, offsetWrapper); } - double consumeTps = this.brokerController.getBrokerStatsManager() - .tpsGroupGetNums(group, topic); + double consumeTps = this.brokerController.getBrokerStatsManager().tpsGroupGetNums(group, topic); consumeTps += consumeStats.getConsumeTps(); consumeStats.setConsumeTps(consumeTps); totalDiff += consumeStats.computeTotalDiff(); @@ -1333,114 +1201,82 @@ private RemotingCommand fetchAllConsumeStatsInBroker(ChannelHandlerContext ctx, } private HashMap prepareRuntimeInfo() { - HashMap runtimeInfo = this.brokerController.getMessageStore() - .getRuntimeInfo(); + HashMap runtimeInfo = this.brokerController.getMessageStore().getRuntimeInfo(); runtimeInfo.put("brokerVersionDesc", MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION)); runtimeInfo.put("brokerVersion", String.valueOf(MQVersion.CURRENT_VERSION)); runtimeInfo.put("msgPutTotalYesterdayMorning", - String.valueOf( - this.brokerController.getBrokerStats().getMsgPutTotalYesterdayMorning())); - runtimeInfo.put("msgPutTotalTodayMorning", String.valueOf( - this.brokerController.getBrokerStats().getMsgPutTotalTodayMorning())); - runtimeInfo.put("msgPutTotalTodayNow", - String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalTodayNow())); + String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalYesterdayMorning())); + runtimeInfo.put("msgPutTotalTodayMorning", String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalTodayMorning())); + runtimeInfo.put("msgPutTotalTodayNow", String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalTodayNow())); runtimeInfo.put("msgGetTotalYesterdayMorning", - String.valueOf( - this.brokerController.getBrokerStats().getMsgGetTotalYesterdayMorning())); - runtimeInfo.put("msgGetTotalTodayMorning", String.valueOf( - this.brokerController.getBrokerStats().getMsgGetTotalTodayMorning())); - runtimeInfo.put("msgGetTotalTodayNow", - String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalTodayNow())); + String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalYesterdayMorning())); + runtimeInfo.put("msgGetTotalTodayMorning", String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalTodayMorning())); + runtimeInfo.put("msgGetTotalTodayNow", String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalTodayNow())); - runtimeInfo.put("sendThreadPoolQueueSize", - String.valueOf(this.brokerController.getSendThreadPoolQueue().size())); + runtimeInfo.put("sendThreadPoolQueueSize", String.valueOf(this.brokerController.getSendThreadPoolQueue().size())); runtimeInfo.put("sendThreadPoolQueueCapacity", - String.valueOf( - this.brokerController.getBrokerConfig().getSendThreadPoolQueueCapacity())); + String.valueOf(this.brokerController.getBrokerConfig().getSendThreadPoolQueueCapacity())); - runtimeInfo.put("pullThreadPoolQueueSize", - String.valueOf(this.brokerController.getPullThreadPoolQueue().size())); + runtimeInfo.put("pullThreadPoolQueueSize", String.valueOf(this.brokerController.getPullThreadPoolQueue().size())); runtimeInfo.put("pullThreadPoolQueueCapacity", - String.valueOf( - this.brokerController.getBrokerConfig().getPullThreadPoolQueueCapacity())); + String.valueOf(this.brokerController.getBrokerConfig().getPullThreadPoolQueueCapacity())); - runtimeInfo.put("queryThreadPoolQueueSize", - String.valueOf(this.brokerController.getQueryThreadPoolQueue().size())); + runtimeInfo.put("queryThreadPoolQueueSize", String.valueOf(this.brokerController.getQueryThreadPoolQueue().size())); runtimeInfo.put("queryThreadPoolQueueCapacity", - String.valueOf( - this.brokerController.getBrokerConfig().getQueryThreadPoolQueueCapacity())); + String.valueOf(this.brokerController.getBrokerConfig().getQueryThreadPoolQueueCapacity())); - runtimeInfo.put("EndTransactionQueueSize", - String.valueOf(this.brokerController.getEndTransactionThreadPoolQueue().size())); + runtimeInfo.put("EndTransactionQueueSize", String.valueOf(this.brokerController.getEndTransactionThreadPoolQueue().size())); runtimeInfo.put("EndTransactionThreadPoolQueueCapacity", - String.valueOf(this.brokerController.getBrokerConfig() - .getEndTransactionPoolQueueCapacity())); - - runtimeInfo.put("dispatchBehindBytes", - String.valueOf(this.brokerController.getMessageStore().dispatchBehindBytes())); - runtimeInfo.put("pageCacheLockTimeMills", - String.valueOf(this.brokerController.getMessageStore().lockTimeMills())); - - runtimeInfo.put("sendThreadPoolQueueHeadWaitTimeMills", - String.valueOf(this.brokerController.headSlowTimeMills4SendThreadPoolQueue())); - runtimeInfo.put("pullThreadPoolQueueHeadWaitTimeMills", - String.valueOf(this.brokerController.headSlowTimeMills4PullThreadPoolQueue())); - runtimeInfo.put("queryThreadPoolQueueHeadWaitTimeMills", - String.valueOf(this.brokerController.headSlowTimeMills4QueryThreadPoolQueue())); - - runtimeInfo.put("earliestMessageTimeStamp", - String.valueOf(this.brokerController.getMessageStore().getEarliestMessageTime())); - runtimeInfo.put("startAcceptSendRequestTimeStamp", String.valueOf( - this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp())); + String.valueOf(this.brokerController.getBrokerConfig().getEndTransactionPoolQueueCapacity())); + + runtimeInfo.put("dispatchBehindBytes", String.valueOf(this.brokerController.getMessageStore().dispatchBehindBytes())); + runtimeInfo.put("pageCacheLockTimeMills", String.valueOf(this.brokerController.getMessageStore().lockTimeMills())); + + runtimeInfo.put("sendThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4SendThreadPoolQueue())); + runtimeInfo.put("pullThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4PullThreadPoolQueue())); + runtimeInfo.put("queryThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4QueryThreadPoolQueue())); + + runtimeInfo.put("earliestMessageTimeStamp", String.valueOf(this.brokerController.getMessageStore().getEarliestMessageTime())); + runtimeInfo.put("startAcceptSendRequestTimeStamp", String.valueOf(this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp())); if (this.brokerController.getMessageStore() instanceof DefaultMessageStore) { - DefaultMessageStore defaultMessageStore = (DefaultMessageStore) this.brokerController - .getMessageStore(); - runtimeInfo.put("remainTransientStoreBufferNumbs", - String.valueOf(defaultMessageStore.remainTransientStoreBufferNumbs())); + DefaultMessageStore defaultMessageStore = (DefaultMessageStore) this.brokerController.getMessageStore(); + runtimeInfo.put("remainTransientStoreBufferNumbs", String.valueOf(defaultMessageStore.remainTransientStoreBufferNumbs())); if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { - runtimeInfo.put("remainHowManyDataToCommit", MixAll.humanReadableByteCount( - defaultMessageStore.getCommitLog().remainHowManyDataToCommit(), false)); + runtimeInfo.put("remainHowManyDataToCommit", MixAll.humanReadableByteCount(defaultMessageStore.getCommitLog().remainHowManyDataToCommit(), false)); } - runtimeInfo.put("remainHowManyDataToFlush", MixAll.humanReadableByteCount( - defaultMessageStore.getCommitLog().remainHowManyDataToFlush(), false)); + runtimeInfo.put("remainHowManyDataToFlush", MixAll.humanReadableByteCount(defaultMessageStore.getCommitLog().remainHowManyDataToFlush(), false)); } - java.io.File commitLogDir = new java.io.File( - this.brokerController.getMessageStoreConfig().getStorePathRootDir()); + java.io.File commitLogDir = new java.io.File(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); if (commitLogDir.exists()) { - runtimeInfo.put("commitLogDirCapacity", String.format("Total : %s, Free : %s.", - MixAll.humanReadableByteCount(commitLogDir.getTotalSpace(), false), - MixAll.humanReadableByteCount(commitLogDir.getFreeSpace(), false))); + runtimeInfo.put("commitLogDirCapacity", String.format("Total : %s, Free : %s.", MixAll.humanReadableByteCount(commitLogDir.getTotalSpace(), false), MixAll.humanReadableByteCount(commitLogDir.getFreeSpace(), false))); } return runtimeInfo; } private RemotingCommand callConsumer( - final int requestCode, - final RemotingCommand request, - final String consumerGroup, - final String clientId) throws RemotingCommandException { + final int requestCode, + final RemotingCommand request, + final String consumerGroup, + final String clientId) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); - ClientChannelInfo clientChannelInfo = this.brokerController.getConsumerManager() - .findChannel(consumerGroup, clientId); + ClientChannelInfo clientChannelInfo = this.brokerController.getConsumerManager().findChannel(consumerGroup, clientId); if (null == clientChannelInfo) { response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark( - String.format("The Consumer <%s> <%s> not online", consumerGroup, clientId)); + response.setRemark(String.format("The Consumer <%s> <%s> not online", consumerGroup, clientId)); return response; } if (clientChannelInfo.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) { response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(String.format( - "The Consumer <%s> Version <%s> too low to finish, please upgrade it to V3_1_8_SNAPSHOT", - clientId, - MQVersion.getVersionDesc(clientChannelInfo.getVersion()))); + response.setRemark(String.format("The Consumer <%s> Version <%s> too low to finish, please upgrade it to V3_1_8_SNAPSHOT", + clientId, + MQVersion.getVersionDesc(clientChannelInfo.getVersion()))); return response; } @@ -1449,39 +1285,32 @@ private RemotingCommand callConsumer( newRequest.setExtFields(request.getExtFields()); newRequest.setBody(request.getBody()); - return this.brokerController.getBroker2Client() - .callClient(clientChannelInfo.getChannel(), newRequest); + return this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(), newRequest); } catch (RemotingTimeoutException e) { response.setCode(ResponseCode.CONSUME_MSG_TIMEOUT); response - .setRemark( - String.format("consumer <%s> <%s> Timeout: %s", consumerGroup, clientId, - RemotingHelper.exceptionSimpleDesc(e))); + .setRemark(String.format("consumer <%s> <%s> Timeout: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e))); return response; } catch (Exception e) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark( - String.format("invoke consumer <%s> <%s> Exception: %s", consumerGroup, - clientId, RemotingHelper.exceptionSimpleDesc(e))); + String.format("invoke consumer <%s> <%s> Exception: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e))); return response; } } private RemotingCommand queryConsumeQueue(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { + RemotingCommand request) throws RemotingCommandException { QueryConsumeQueueRequestHeader requestHeader = - (QueryConsumeQueueRequestHeader) request - .decodeCommandCustomHeader(QueryConsumeQueueRequestHeader.class); + (QueryConsumeQueueRequestHeader) request.decodeCommandCustomHeader(QueryConsumeQueueRequestHeader.class); RemotingCommand response = RemotingCommand.createResponseCommand(null); - ConsumeQueue consumeQueue = this.brokerController.getMessageStore() - .getConsumeQueue(requestHeader.getTopic(), - requestHeader.getQueueId()); + ConsumeQueue consumeQueue = this.brokerController.getMessageStore().getConsumeQueue(requestHeader.getTopic(), + requestHeader.getQueueId()); if (consumeQueue == null) { response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(String.format("%d@%s is not exist!", requestHeader.getQueueId(), - requestHeader.getTopic())); + response.setRemark(String.format("%d@%s is not exist!", requestHeader.getQueueId(), requestHeader.getTopic())); return response; } @@ -1494,37 +1323,30 @@ private RemotingCommand queryConsumeQueue(ChannelHandlerContext ctx, MessageFilter messageFilter = null; if (requestHeader.getConsumerGroup() != null) { - SubscriptionData subscriptionData = this.brokerController.getConsumerManager() - .findSubscriptionData( - requestHeader.getConsumerGroup(), requestHeader.getTopic() - ); + SubscriptionData subscriptionData = this.brokerController.getConsumerManager().findSubscriptionData( + requestHeader.getConsumerGroup(), requestHeader.getTopic() + ); body.setSubscriptionData(subscriptionData); if (subscriptionData == null) { - body.setFilterData( - String.format("%s@%s is not online!", requestHeader.getConsumerGroup(), - requestHeader.getTopic())); + body.setFilterData(String.format("%s@%s is not online!", requestHeader.getConsumerGroup(), requestHeader.getTopic())); } else { ConsumerFilterData filterData = this.brokerController.getConsumerFilterManager() - .get(requestHeader.getTopic(), requestHeader.getConsumerGroup()); + .get(requestHeader.getTopic(), requestHeader.getConsumerGroup()); body.setFilterData(JSON.toJSONString(filterData, true)); messageFilter = new ExpressionMessageFilter(subscriptionData, filterData, - this.brokerController.getConsumerFilterManager()); + this.brokerController.getConsumerFilterManager()); } } SelectMappedBufferResult result = consumeQueue.getIndexBuffer(requestHeader.getIndex()); if (result == null) { - response.setRemark( - String.format("Index %d of %d@%s is not exist!", requestHeader.getIndex(), - requestHeader.getQueueId(), requestHeader.getTopic())); + response.setRemark(String.format("Index %d of %d@%s is not exist!", requestHeader.getIndex(), requestHeader.getQueueId(), requestHeader.getTopic())); return response; } try { List queues = new ArrayList<>(); - for (int i = 0; i < result.getSize() - && i < requestHeader.getCount() * ConsumeQueue.CQ_STORE_UNIT_SIZE; - i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { + for (int i = 0; i < result.getSize() && i < requestHeader.getCount() * ConsumeQueue.CQ_STORE_UNIT_SIZE; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { ConsumeQueueData one = new ConsumeQueueData(); one.setPhysicOffset(result.getByteBuffer().getLong()); one.setPhysicSize(result.getByteBuffer().getInt()); @@ -1542,8 +1364,7 @@ private RemotingCommand queryConsumeQueue(ChannelHandlerContext ctx, one.setBitMap(BitsArray.create(cqExtUnit.getFilterBitMap()).toString()); } if (messageFilter != null) { - one.setEval(messageFilter - .isMatchedByConsumeQueue(cqExtUnit.getTagsCode(), cqExtUnit)); + one.setEval(messageFilter.isMatchedByConsumeQueue(cqExtUnit.getTagsCode(), cqExtUnit)); } } else { one.setMsg("Cq extend not exist!addr: " + one.getTagsCode()); @@ -1559,33 +1380,28 @@ private RemotingCommand queryConsumeQueue(ChannelHandlerContext ctx, return response; } - private RemotingCommand resumeCheckHalfMessage(ChannelHandlerContext ctx, - RemotingCommand request) - throws RemotingCommandException { + RemotingCommand request) + throws RemotingCommandException { final ResumeCheckHalfMessageRequestHeader requestHeader = (ResumeCheckHalfMessageRequestHeader) request - .decodeCommandCustomHeader(ResumeCheckHalfMessageRequestHeader.class); + .decodeCommandCustomHeader(ResumeCheckHalfMessageRequestHeader.class); final RemotingCommand response = RemotingCommand.createResponseCommand(null); - ResumeCheckHalfMessageResult resumeCheckHalfMessageResult = new ResumeCheckHalfMessageResult(); SelectMappedBufferResult selectMappedBufferResult = null; - Long beginTime = System.currentTimeMillis(); try { MessageId messageId = MessageDecoder.decodeMessageId(requestHeader.getMsgId()); selectMappedBufferResult = this.brokerController.getMessageStore() - .selectOneMessageByOffset(messageId.getOffset()); + .selectOneMessageByOffset(messageId.getOffset()); MessageExt msg = MessageDecoder.decode(selectMappedBufferResult.getByteBuffer()); msg.putUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, String.valueOf(0)); PutMessageResult putMessageResult = this.brokerController.getMessageStore() - .putMessage(toMessageExtBrokerInner(msg)); + .putMessage(toMessageExtBrokerInner(msg)); if (putMessageResult != null - && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { + && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { log.info( - "Put message back to RMQ_SYS_TRANS_HALF_TOPIC. real topic={}", - msg.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC)); + "Put message back to RMQ_SYS_TRANS_HALF_TOPIC. real topic={}", + msg.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC)); response.setCode(ResponseCode.SUCCESS); - resumeCheckHalfMessageResult.setResumeResult(ResumeResult.RESUME_SUCCESS); - resumeCheckHalfMessageResult.setSpentTimeMills(System.currentTimeMillis() - beginTime); - response.setBody(resumeCheckHalfMessageResult.encode()); + response.setRemark(null); } else { log.error("Put message back to RMQ_SYS_TRANS_HALF_TOPIC failed."); response.setCode(ResponseCode.SYSTEM_ERROR); @@ -1621,5 +1437,4 @@ private MessageExtBrokerInner toMessageExtBrokerInner(MessageExt msgExt) { inner.setWaitStoreMsgOK(false); return inner; } - } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index 8652e0bb1b50..8f215cdcb9cd 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -220,7 +220,7 @@ public TopicConfig createTopicInSendMessageMethod(final String topic, final Stri } if (createNew) { - this.brokerController.registerBrokerAll(false, true,true); + this.brokerController.registerBrokerAll(false, true, true); } return topicConfig; @@ -264,7 +264,7 @@ public TopicConfig createTopicInSendMessageBackMethod( } if (createNew) { - this.brokerController.registerBrokerAll(false, true,true); + this.brokerController.registerBrokerAll(false, true, true); } return topicConfig; @@ -304,7 +304,7 @@ public TopicConfig createTopicOfTranCheckMaxTime(final int clientDefaultTopicQue } if (createNew) { - this.brokerController.registerBrokerAll(false, true,true); + this.brokerController.registerBrokerAll(false, true, true); } return topicConfig; @@ -329,7 +329,7 @@ public void updateTopicUnitFlag(final String topic, final boolean unit) { this.dataVersion.nextVersion(); this.persist(); - this.brokerController.registerBrokerAll(false, true,true); + this.brokerController.registerBrokerAll(false, true, true); } } @@ -349,7 +349,7 @@ public void updateTopicUnitSubFlag(final String topic, final boolean hasUnitSub) this.dataVersion.nextVersion(); this.persist(); - this.brokerController.registerBrokerAll(false, true,true); + this.brokerController.registerBrokerAll(false, true, true); } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java index 80a0309a887f..ee87bd375300 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java @@ -30,12 +30,8 @@ import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; -public class DefaultTransactionalMessageCheckListener extends - AbstractTransactionalMessageCheckListener { - - private static final InternalLogger log = InternalLoggerFactory - .getLogger(LoggerName.TRANSACTION_LOGGER_NAME); - +public class DefaultTransactionalMessageCheckListener extends AbstractTransactionalMessageCheckListener { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME); public DefaultTransactionalMessageCheckListener() { super(); @@ -43,26 +39,16 @@ public DefaultTransactionalMessageCheckListener() { @Override public void resolveDiscardMsg(MessageExt msgExt) { - log.error( - "MsgExt:{} has been checked too many times, so discard it by moving it to system topic TRANS_CHECK_MAXTIME_TOPIC", - msgExt); + log.error("MsgExt:{} has been checked too many times, so discard it by moving it to system topic TRANS_CHECK_MAXTIME_TOPIC", msgExt); try { MessageExtBrokerInner brokerInner = toMessageExtBrokerInner(msgExt); - PutMessageResult putMessageResult = this.getBrokerController().getMessageStore() - .putMessage(brokerInner); - if (putMessageResult != null - && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { - log.info( - "Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC OK. Restored in queueOffset={}, " - + "commitLogOffset={}, real topic={}", - msgExt.getQueueOffset(), msgExt.getCommitLogOffset(), - msgExt.getUserProperty( - MessageConst.PROPERTY_REAL_TOPIC)); + PutMessageResult putMessageResult = this.getBrokerController().getMessageStore().putMessage(brokerInner); + if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { + log.info("Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC OK. Restored in queueOffset={}, " + + "commitLogOffset={}, real topic={}", msgExt.getQueueOffset(), msgExt.getCommitLogOffset(), msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC)); } else { - log.error( - "Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC failed, real topic={}, msgId={}", - msgExt.getTopic(), msgExt.getMsgId()); + log.error("Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC failed, real topic={}, msgId={}", msgExt.getTopic(), msgExt.getMsgId()); } } catch (Exception e) { log.warn("Put checked-too-many-time message to TRANS_CHECK_MAXTIME_TOPIC error. {}", e); @@ -71,9 +57,7 @@ public void resolveDiscardMsg(MessageExt msgExt) { } private MessageExtBrokerInner toMessageExtBrokerInner(MessageExt msgExt) { - TopicConfig topicConfig = this.getBrokerController().getTopicConfigManager() - .createTopicOfTranCheckMaxTime(TCMT_QUEUE_NUMS, - PermName.PERM_READ | PermName.PERM_WRITE); + TopicConfig topicConfig = this.getBrokerController().getTopicConfigManager().createTopicOfTranCheckMaxTime(TCMT_QUEUE_NUMS, PermName.PERM_READ | PermName.PERM_WRITE); int queueId = Math.abs(random.nextInt() % 99999999) % TCMT_QUEUE_NUMS; MessageExtBrokerInner inner = new MessageExtBrokerInner(); inner.setTopic(topicConfig.getTopicName()); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java index 90de9205226b..501d2a2bc232 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java @@ -16,10 +16,6 @@ */ package org.apache.rocketmq.broker.processor; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.when; - import io.netty.channel.ChannelHandlerContext; import java.net.SocketAddress; import java.net.UnknownHostException; @@ -28,7 +24,6 @@ import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; -import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageId; import org.apache.rocketmq.common.protocol.RequestCode; @@ -52,12 +47,13 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.Spy; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; +import org.mockito.junit.MockitoJUnitRunner; -@RunWith(PowerMockRunner.class) -@PrepareForTest({MessageDecoder.class}) +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) public class AdminBrokerProcessorTest { private AdminBrokerProcessor adminBrokerProcessor; @@ -84,9 +80,9 @@ public void init() { public void testProcessRequest_success() throws RemotingCommandException, UnknownHostException { RemotingCommand request = createResumeCheckHalfMessageCommand(); when(messageStore.selectOneMessageByOffset(any(Long.class))).thenReturn(createSelectMappedBufferResult()); - PowerMockito.mockStatic(MessageDecoder.class); - PowerMockito.when(MessageDecoder.decode(any(ByteBuffer.class))).thenReturn(createDefaultMessageExt()); - PowerMockito.when(MessageDecoder.decodeMessageId(any(String.class))).thenReturn(createMessageId()); +// PowerMockito.mockStatic(MessageDecoder.class); +// PowerMockito.when(MessageDecoder.decode(any(ByteBuffer.class))).thenReturn(createDefaultMessageExt()); +// PowerMockito.when(MessageDecoder.decodeMessageId(any(String.class))).thenReturn(createMessageId()); when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult (PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); @@ -97,28 +93,15 @@ public void testProcessRequest_success() throws RemotingCommandException, Unknow public void testProcessRequest_fail() throws RemotingCommandException, UnknownHostException { RemotingCommand request = createResumeCheckHalfMessageCommand(); when(messageStore.selectOneMessageByOffset(any(Long.class))).thenReturn(createSelectMappedBufferResult()); - PowerMockito.mockStatic(MessageDecoder.class); - PowerMockito.when(MessageDecoder.decode(any(ByteBuffer.class))).thenReturn(createDefaultMessageExt()); - PowerMockito.when(MessageDecoder.decodeMessageId(any(String.class))).thenReturn(createMessageId()); +// PowerMockito.mockStatic(MessageDecoder.class); +// PowerMockito.when(MessageDecoder.decode(any(ByteBuffer.class))).thenReturn(createDefaultMessageExt()); +// PowerMockito.when(MessageDecoder.decodeMessageId(any(String.class))).thenReturn(createMessageId()); when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult (PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); } - @Test - public void testProcessRequest_exception() throws RemotingCommandException, UnknownHostException { - RemotingCommand request = createResumeCheckHalfMessageCommand(); - when(messageStore.selectOneMessageByOffset(any(Long.class))).thenReturn(createSelectMappedBufferResult()); - PowerMockito.mockStatic(MessageDecoder.class); - PowerMockito.when(MessageDecoder.decode(any(ByteBuffer.class))).thenReturn(createDefaultMessageExt()); - PowerMockito.when(MessageDecoder.decodeMessageId(any(String.class))).thenThrow(new UnknownHostException()); - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult - (PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); - RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); - assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); - } - private MessageExt createDefaultMessageExt() { MessageExt messageExt = new MessageExt(); messageExt.setMsgId("12345678"); @@ -142,7 +125,7 @@ private SelectMappedBufferResult createSelectMappedBufferResult(){ } private ResumeCheckHalfMessageRequestHeader createResumeCheckHalfMessageRequestHeader() { ResumeCheckHalfMessageRequestHeader header = new ResumeCheckHalfMessageRequestHeader(); - header.setMsgId("12345678"); + header.setMsgId("C0A803CA00002A9F0000000000031367"); return header; } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java index 3c5869e5fd7d..2866de613c1a 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java @@ -45,9 +45,8 @@ public class DefaultTransactionalMessageCheckListenerTest { @Spy private BrokerController brokerController = new BrokerController(new BrokerConfig(), - new NettyServerConfig(), - new NettyClientConfig(), new MessageStoreConfig()); - + new NettyServerConfig(), + new NettyClientConfig(), new MessageStoreConfig()); @Before public void init() throws Exception { @@ -61,6 +60,7 @@ public void init() throws Exception { public void destroy() { // brokerController.shutdown(); } + @Test public void testResolveHalfMsg() { listener.resolveHalfMsg(createMessageExt()); @@ -81,10 +81,10 @@ private MessageExtBrokerInner createMessageExt() { MessageExtBrokerInner inner = new MessageExtBrokerInner(); MessageAccessor.putProperty(inner, MessageConst.PROPERTY_REAL_QUEUE_ID, "1"); MessageAccessor - .putProperty(inner, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, "1234255"); + .putProperty(inner, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, "1234255"); MessageAccessor.putProperty(inner, MessageConst.PROPERTY_REAL_TOPIC, "realTopic"); inner.setTransactionId( - inner.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); + inner.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); inner.setBody("check".getBytes()); inner.setMsgId("12344567890"); inner.setQueueId(0); @@ -97,20 +97,15 @@ public void testResolveDiscardMsg() { messageExt.setTopic(MixAll.RMQ_SYS_TRANS_HALF_TOPIC); messageExt.setQueueId(0); messageExt.setBody("test resolve discard msg".getBytes()); - messageExt.setStoreHost(new InetSocketAddress("127.0.0.1",10911)); - messageExt.setBornHost(new InetSocketAddress("127.0.0.1",54270)); + messageExt.setStoreHost(new InetSocketAddress("127.0.0.1", 10911)); + messageExt.setBornHost(new InetSocketAddress("127.0.0.1", 54270)); MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_REAL_TOPIC, "test_topic"); - MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_PRODUCER_GROUP, - "PID_TEST_DISCARD_MSG"); + MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_PRODUCER_GROUP, "PID_TEST_DISCARD_MSG"); MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); - MessageAccessor - .putProperty(messageExt, MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, "15"); - MessageAccessor - .putProperty(messageExt, MessageConst.PROPERTY_REAL_QUEUE_ID, "2"); - MessageAccessor - .putProperty(messageExt, MessageConst.PROPERTY_TAGS, "test_discard_msg"); - MessageAccessor - .putProperty(messageExt, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, "AC14157E4F1C18B4AAC27EB1A0F30000"); + MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, "15"); + MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_REAL_QUEUE_ID, "2"); + MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_TAGS, "test_discard_msg"); + MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, "AC14157E4F1C18B4AAC27EB1A0F30000"); listener.resolveDiscardMsg(messageExt); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index be25b7e172d8..f5fb885c08b3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -76,7 +76,6 @@ import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody; import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; import org.apache.rocketmq.common.protocol.body.ResetOffsetBody; -import org.apache.rocketmq.common.protocol.body.ResumeCheckHalfMessageResult; import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicList; @@ -2092,28 +2091,23 @@ public void checkClientInBroker(final String brokerAddr, final String consumerGr } } - public ResumeCheckHalfMessageResult resumeCheckHalfMessage(final String addr, String msgId, - final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { + public boolean resumeCheckHalfMessage(final String addr, String msgId, + final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { ResumeCheckHalfMessageRequestHeader requestHeader = new ResumeCheckHalfMessageRequestHeader(); requestHeader.setMsgId(msgId); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.RESUME_CHECK_HALF_MESSAGE, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); + request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { - byte[] body = response.getBody(); - if (body != null) { - ResumeCheckHalfMessageResult info = ResumeCheckHalfMessageResult.decode(body, ResumeCheckHalfMessageResult.class); - return info; - } + return true; } default: - break; + log.error("Failed to resume half message check logic. Remark={}", response.getRemark()); + return false; } - - throw new MQClientException(response.getCode(), response.getRemark()); } } diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java index c13e75c206c7..57310963607b 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java @@ -19,6 +19,7 @@ import java.lang.reflect.Field; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.hook.SendMessageContext; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.producer.DefaultMQProducer; @@ -62,7 +63,7 @@ public class MQClientAPIImplTest { @Mock private DefaultMQProducerImpl defaultMQProducerImpl; - private String brokerAddr = "127.0.0.1"; + private String brokerAddr = "127.0.0.1:10911"; private String brokerName = "DefaultBroker"; private static String group = "FooBarGroup"; private static String topic = "FooBar"; @@ -162,7 +163,7 @@ public void testSendMessageAsync_Success() throws RemotingException, Interrupted public Object answer(InvocationOnMock mock) throws Throwable { InvokeCallback callback = mock.getArgument(3); RemotingCommand request = mock.getArgument(1); - ResponseFuture responseFuture = new ResponseFuture(null,request.getOpaque(), 3 * 1000, null, null); + ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null); responseFuture.setResponseCommand(createSuccessResponse(request)); callback.operationComplete(responseFuture); return null; @@ -210,6 +211,39 @@ public void testSendMessageAsync_WithException() throws RemotingException, Inter } } + @Test + public void testResumeCheckHalfMessage_WithException() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock mock) throws Throwable { + RemotingCommand request = mock.getArgument(1); + RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setOpaque(request.getOpaque()); + response.setRemark("Put message back to RMQ_SYS_TRANS_HALF_TOPIC failed."); + return response; + } + }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong()); + + boolean result = mqClientAPI.resumeCheckHalfMessage(brokerAddr, "test", 3000); + assertThat(result).isEqualTo(false); + } + + @Test + public void testResumeCheckHalfMessage_Success() throws InterruptedException, RemotingException, MQBrokerException, MQClientException { + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock mock) throws Throwable { + RemotingCommand request = mock.getArgument(1); + return createResumeSuccessResponse(request); + } + }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong()); + + boolean result = mqClientAPI.resumeCheckHalfMessage(brokerAddr, "test", 3000); + + assertThat(result).isEqualTo(true); + } + private RemotingCommand createSuccessResponse(RemotingCommand request) { RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); response.setCode(ResponseCode.SUCCESS); @@ -228,6 +262,13 @@ private RemotingCommand createSuccessResponse(RemotingCommand request) { return response; } + private RemotingCommand createResumeSuccessResponse(RemotingCommand request) { + RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setCode(ResponseCode.SUCCESS); + response.setOpaque(request.getOpaque()); + return response; + } + private SendMessageRequestHeader createSendMessageRequestHeader() { SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); requestHeader.setBornTimestamp(System.currentTimeMillis()); diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResumeCheckHalfMessageResult.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResumeCheckHalfMessageResult.java deleted file mode 100644 index 58dabf446a8c..000000000000 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResumeCheckHalfMessageResult.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.common.protocol.body; - -import org.apache.rocketmq.remoting.protocol.RemotingSerializable; - -public class ResumeCheckHalfMessageResult extends RemotingSerializable { - - private ResumeResult resumeResult; - private String remark; - private long spentTimeMills; - - - public ResumeResult getResumeResult() { - return resumeResult; - } - - public void setResumeResult(ResumeResult resumeResult) { - this.resumeResult = resumeResult; - } - - public String getRemark() { - return remark; - } - - public void setRemark(String remark) { - this.remark = remark; - } - - - public long getSpentTimeMills() { - return spentTimeMills; - } - - public void setSpentTimeMills(long spentTimeMills) { - this.spentTimeMills = spentTimeMills; - } - - @Override - public String toString() { - return "ConsumeMessageDirectlyResult [consumeResult=" + resumeResult + ", remark=" + remark - + ", spentTimeMills=" - + spentTimeMills + "]"; - } -} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResumeResult.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResumeResult.java deleted file mode 100644 index 583c737f3dd7..000000000000 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResumeResult.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.common.protocol.body; - -public enum ResumeResult { - RESUME_SUCCESS, - RESUME_THROW_EXCEPTION -} diff --git a/pom.xml b/pom.xml index 48a4829c66cc..7dd5d7f46dbe 100644 --- a/pom.xml +++ b/pom.xml @@ -457,18 +457,6 @@ 2.23.0 test - - org.powermock - powermock-api-mockito2 - 2.0.0 - test - - - org.powermock - powermock-module-junit4 - 2.0.0 - test - diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index c8e476464f82..6eb9d9ca1499 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -43,7 +43,6 @@ import org.apache.rocketmq.common.protocol.body.ProducerConnection; import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody; import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; -import org.apache.rocketmq.common.protocol.body.ResumeCheckHalfMessageResult; import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicList; @@ -516,13 +515,13 @@ public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String } @Override - public ResumeCheckHalfMessageResult resumeCheckHalfMessage(String msgId) + public boolean resumeCheckHalfMessage(String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { return this.defaultMQAdminExtImpl.resumeCheckHalfMessage(msgId); } @Override - public ResumeCheckHalfMessageResult resumeCheckHalfMessage(String topic, + public boolean resumeCheckHalfMessage(String topic, String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { return this.defaultMQAdminExtImpl.resumeCheckHalfMessage(topic, msgId); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 1446fd6df23e..6ca493736c8f 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -47,7 +47,6 @@ import org.apache.rocketmq.common.admin.TopicOffset; import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.help.FAQUrl; -import org.apache.rocketmq.common.protocol.body.ResumeCheckHalfMessageResult; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.MessageClientExt; import org.apache.rocketmq.common.message.MessageConst; @@ -1003,7 +1002,7 @@ public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String } @Override - public ResumeCheckHalfMessageResult resumeCheckHalfMessage(String msgId) + public boolean resumeCheckHalfMessage(String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { MessageExt msg = this.viewMessage(msgId); @@ -1011,7 +1010,7 @@ public ResumeCheckHalfMessageResult resumeCheckHalfMessage(String msgId) } @Override - public ResumeCheckHalfMessageResult resumeCheckHalfMessage(final String topic, final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + public boolean resumeCheckHalfMessage(final String topic, final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { MessageExt msg = this.viewMessage(topic, msgId); if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) { return this.mqClientInstance.getMQClientAPIImpl().resumeCheckHalfMessage(RemotingUtil.socketAddress2String(msg.getStoreHost()), msgId, timeoutMillis); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 84dcb7aca741..7b0069346139 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -41,7 +41,6 @@ import org.apache.rocketmq.common.protocol.body.ProducerConnection; import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody; import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; -import org.apache.rocketmq.common.protocol.body.ResumeCheckHalfMessageResult; import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicList; @@ -261,8 +260,8 @@ QueryConsumeQueueResponseBody queryConsumeQueue(final String brokerAddr, final long index, final int count, final String consumerGroup) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException; - ResumeCheckHalfMessageResult resumeCheckHalfMessage(String msgId) + boolean resumeCheckHalfMessage(String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException; - ResumeCheckHalfMessageResult resumeCheckHalfMessage(final String topic, final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException; + boolean resumeCheckHalfMessage(final String topic, final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException; } diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java new file mode 100644 index 000000000000..49c48faa9d41 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.message; + + import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.command.SubCommandException; +import org.junit.Test; + + public class QueryMsgByUniqueKeySubCommandTest { + + @Test + public void testExecute() throws SubCommandException { + + System.setProperty("rocketmq.namesrv.addr", "127.0.0.1:9876"); + + QueryMsgByUniqueKeySubCommand cmd = new QueryMsgByUniqueKeySubCommand(); + String[] args = new String[]{"-t myTopicTest", "-i 0A3A54F7BF7D18B4AAC28A3FA2CF0000"}; + Options options = ServerUtil.buildCommandlineOptions(new Options()); + CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin ", args, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + + args = new String[]{"-t myTopicTest", "-i 0A3A54F7BF7D18B4AAC28A3FA2CF0000", "-g producerGroupName", "-d clientId"}; + commandLine = ServerUtil.parseCmdLine("mqadmin ", args, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + + } + + } \ No newline at end of file