Skip to content

Commit

Permalink
add test case:resumeCheckHalfMessage
Browse files Browse the repository at this point in the history
  • Loading branch information
chengxiangwang committed Jun 17, 2019
1 parent 80be5e9 commit 584a31c
Show file tree
Hide file tree
Showing 14 changed files with 351 additions and 629 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public TopicConfig createTopicInSendMessageMethod(final String topic, final Stri
}

if (createNew) {
this.brokerController.registerBrokerAll(false, true,true);
this.brokerController.registerBrokerAll(false, true, true);
}

return topicConfig;
Expand Down Expand Up @@ -264,7 +264,7 @@ public TopicConfig createTopicInSendMessageBackMethod(
}

if (createNew) {
this.brokerController.registerBrokerAll(false, true,true);
this.brokerController.registerBrokerAll(false, true, true);
}

return topicConfig;
Expand Down Expand Up @@ -304,7 +304,7 @@ public TopicConfig createTopicOfTranCheckMaxTime(final int clientDefaultTopicQue
}

if (createNew) {
this.brokerController.registerBrokerAll(false, true,true);
this.brokerController.registerBrokerAll(false, true, true);
}

return topicConfig;
Expand All @@ -329,7 +329,7 @@ public void updateTopicUnitFlag(final String topic, final boolean unit) {
this.dataVersion.nextVersion();

this.persist();
this.brokerController.registerBrokerAll(false, true,true);
this.brokerController.registerBrokerAll(false, true, true);
}
}

Expand All @@ -349,7 +349,7 @@ public void updateTopicUnitSubFlag(final String topic, final boolean hasUnitSub)
this.dataVersion.nextVersion();

this.persist();
this.brokerController.registerBrokerAll(false, true,true);
this.brokerController.registerBrokerAll(false, true, true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,39 +30,25 @@
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;

public class DefaultTransactionalMessageCheckListener extends
AbstractTransactionalMessageCheckListener {

private static final InternalLogger log = InternalLoggerFactory
.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);

public class DefaultTransactionalMessageCheckListener extends AbstractTransactionalMessageCheckListener {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);

public DefaultTransactionalMessageCheckListener() {
super();
}

@Override
public void resolveDiscardMsg(MessageExt msgExt) {
log.error(
"MsgExt:{} has been checked too many times, so discard it by moving it to system topic TRANS_CHECK_MAXTIME_TOPIC",
msgExt);
log.error("MsgExt:{} has been checked too many times, so discard it by moving it to system topic TRANS_CHECK_MAXTIME_TOPIC", msgExt);

try {
MessageExtBrokerInner brokerInner = toMessageExtBrokerInner(msgExt);
PutMessageResult putMessageResult = this.getBrokerController().getMessageStore()
.putMessage(brokerInner);
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
log.info(
"Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC OK. Restored in queueOffset={}, "
+ "commitLogOffset={}, real topic={}",
msgExt.getQueueOffset(), msgExt.getCommitLogOffset(),
msgExt.getUserProperty(
MessageConst.PROPERTY_REAL_TOPIC));
PutMessageResult putMessageResult = this.getBrokerController().getMessageStore().putMessage(brokerInner);
if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
log.info("Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC OK. Restored in queueOffset={}, " +
"commitLogOffset={}, real topic={}", msgExt.getQueueOffset(), msgExt.getCommitLogOffset(), msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
} else {
log.error(
"Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC failed, real topic={}, msgId={}",
msgExt.getTopic(), msgExt.getMsgId());
log.error("Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC failed, real topic={}, msgId={}", msgExt.getTopic(), msgExt.getMsgId());
}
} catch (Exception e) {
log.warn("Put checked-too-many-time message to TRANS_CHECK_MAXTIME_TOPIC error. {}", e);
Expand All @@ -71,9 +57,7 @@ public void resolveDiscardMsg(MessageExt msgExt) {
}

private MessageExtBrokerInner toMessageExtBrokerInner(MessageExt msgExt) {
TopicConfig topicConfig = this.getBrokerController().getTopicConfigManager()
.createTopicOfTranCheckMaxTime(TCMT_QUEUE_NUMS,
PermName.PERM_READ | PermName.PERM_WRITE);
TopicConfig topicConfig = this.getBrokerController().getTopicConfigManager().createTopicOfTranCheckMaxTime(TCMT_QUEUE_NUMS, PermName.PERM_READ | PermName.PERM_WRITE);
int queueId = Math.abs(random.nextInt() % 99999999) % TCMT_QUEUE_NUMS;
MessageExtBrokerInner inner = new MessageExtBrokerInner();
inner.setTopic(topicConfig.getTopicName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@
*/
package org.apache.rocketmq.broker.processor;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

import io.netty.channel.ChannelHandlerContext;
import java.net.SocketAddress;
import java.net.UnknownHostException;
Expand All @@ -28,7 +24,6 @@
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.protocol.RequestCode;
Expand All @@ -52,12 +47,13 @@
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(PowerMockRunner.class)
@PrepareForTest({MessageDecoder.class})
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
public class AdminBrokerProcessorTest {

private AdminBrokerProcessor adminBrokerProcessor;
Expand All @@ -84,9 +80,9 @@ public void init() {
public void testProcessRequest_success() throws RemotingCommandException, UnknownHostException {
RemotingCommand request = createResumeCheckHalfMessageCommand();
when(messageStore.selectOneMessageByOffset(any(Long.class))).thenReturn(createSelectMappedBufferResult());
PowerMockito.mockStatic(MessageDecoder.class);
PowerMockito.when(MessageDecoder.decode(any(ByteBuffer.class))).thenReturn(createDefaultMessageExt());
PowerMockito.when(MessageDecoder.decodeMessageId(any(String.class))).thenReturn(createMessageId());
// PowerMockito.mockStatic(MessageDecoder.class);
// PowerMockito.when(MessageDecoder.decode(any(ByteBuffer.class))).thenReturn(createDefaultMessageExt());
// PowerMockito.when(MessageDecoder.decodeMessageId(any(String.class))).thenReturn(createMessageId());
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult
(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
Expand All @@ -97,28 +93,15 @@ public void testProcessRequest_success() throws RemotingCommandException, Unknow
public void testProcessRequest_fail() throws RemotingCommandException, UnknownHostException {
RemotingCommand request = createResumeCheckHalfMessageCommand();
when(messageStore.selectOneMessageByOffset(any(Long.class))).thenReturn(createSelectMappedBufferResult());
PowerMockito.mockStatic(MessageDecoder.class);
PowerMockito.when(MessageDecoder.decode(any(ByteBuffer.class))).thenReturn(createDefaultMessageExt());
PowerMockito.when(MessageDecoder.decodeMessageId(any(String.class))).thenReturn(createMessageId());
// PowerMockito.mockStatic(MessageDecoder.class);
// PowerMockito.when(MessageDecoder.decode(any(ByteBuffer.class))).thenReturn(createDefaultMessageExt());
// PowerMockito.when(MessageDecoder.decodeMessageId(any(String.class))).thenReturn(createMessageId());
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult
(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
}

@Test
public void testProcessRequest_exception() throws RemotingCommandException, UnknownHostException {
RemotingCommand request = createResumeCheckHalfMessageCommand();
when(messageStore.selectOneMessageByOffset(any(Long.class))).thenReturn(createSelectMappedBufferResult());
PowerMockito.mockStatic(MessageDecoder.class);
PowerMockito.when(MessageDecoder.decode(any(ByteBuffer.class))).thenReturn(createDefaultMessageExt());
PowerMockito.when(MessageDecoder.decodeMessageId(any(String.class))).thenThrow(new UnknownHostException());
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult
(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
}

private MessageExt createDefaultMessageExt() {
MessageExt messageExt = new MessageExt();
messageExt.setMsgId("12345678");
Expand All @@ -142,7 +125,7 @@ private SelectMappedBufferResult createSelectMappedBufferResult(){
}
private ResumeCheckHalfMessageRequestHeader createResumeCheckHalfMessageRequestHeader() {
ResumeCheckHalfMessageRequestHeader header = new ResumeCheckHalfMessageRequestHeader();
header.setMsgId("12345678");
header.setMsgId("C0A803CA00002A9F0000000000031367");
return header;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@ public class DefaultTransactionalMessageCheckListenerTest {

@Spy
private BrokerController brokerController = new BrokerController(new BrokerConfig(),
new NettyServerConfig(),
new NettyClientConfig(), new MessageStoreConfig());

new NettyServerConfig(),
new NettyClientConfig(), new MessageStoreConfig());

@Before
public void init() throws Exception {
Expand All @@ -61,6 +60,7 @@ public void init() throws Exception {
public void destroy() {
// brokerController.shutdown();
}

@Test
public void testResolveHalfMsg() {
listener.resolveHalfMsg(createMessageExt());
Expand All @@ -81,10 +81,10 @@ private MessageExtBrokerInner createMessageExt() {
MessageExtBrokerInner inner = new MessageExtBrokerInner();
MessageAccessor.putProperty(inner, MessageConst.PROPERTY_REAL_QUEUE_ID, "1");
MessageAccessor
.putProperty(inner, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, "1234255");
.putProperty(inner, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, "1234255");
MessageAccessor.putProperty(inner, MessageConst.PROPERTY_REAL_TOPIC, "realTopic");
inner.setTransactionId(
inner.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
inner.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
inner.setBody("check".getBytes());
inner.setMsgId("12344567890");
inner.setQueueId(0);
Expand All @@ -97,20 +97,15 @@ public void testResolveDiscardMsg() {
messageExt.setTopic(MixAll.RMQ_SYS_TRANS_HALF_TOPIC);
messageExt.setQueueId(0);
messageExt.setBody("test resolve discard msg".getBytes());
messageExt.setStoreHost(new InetSocketAddress("127.0.0.1",10911));
messageExt.setBornHost(new InetSocketAddress("127.0.0.1",54270));
messageExt.setStoreHost(new InetSocketAddress("127.0.0.1", 10911));
messageExt.setBornHost(new InetSocketAddress("127.0.0.1", 54270));
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_REAL_TOPIC, "test_topic");
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_PRODUCER_GROUP,
"PID_TEST_DISCARD_MSG");
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_PRODUCER_GROUP, "PID_TEST_DISCARD_MSG");
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor
.putProperty(messageExt, MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, "15");
MessageAccessor
.putProperty(messageExt, MessageConst.PROPERTY_REAL_QUEUE_ID, "2");
MessageAccessor
.putProperty(messageExt, MessageConst.PROPERTY_TAGS, "test_discard_msg");
MessageAccessor
.putProperty(messageExt, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, "AC14157E4F1C18B4AAC27EB1A0F30000");
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, "15");
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_REAL_QUEUE_ID, "2");
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_TAGS, "test_discard_msg");
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, "AC14157E4F1C18B4AAC27EB1A0F30000");
listener.resolveDiscardMsg(messageExt);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@
import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.ResetOffsetBody;
import org.apache.rocketmq.common.protocol.body.ResumeCheckHalfMessageResult;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
Expand Down Expand Up @@ -2210,28 +2209,23 @@ public void checkClientInBroker(final String brokerAddr, final String consumerGr
}
}

public ResumeCheckHalfMessageResult resumeCheckHalfMessage(final String addr, String msgId,
final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
public boolean resumeCheckHalfMessage(final String addr, String msgId,
final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
ResumeCheckHalfMessageRequestHeader requestHeader = new ResumeCheckHalfMessageRequestHeader();
requestHeader.setMsgId(msgId);

RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.RESUME_CHECK_HALF_MESSAGE, requestHeader);

RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
byte[] body = response.getBody();
if (body != null) {
ResumeCheckHalfMessageResult info = ResumeCheckHalfMessageResult.decode(body, ResumeCheckHalfMessageResult.class);
return info;
}
return true;
}
default:
break;
log.error("Failed to resume half message check logic. Remark={}", response.getRemark());
return false;
}

throw new MQClientException(response.getCode(), response.getRemark());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,12 @@
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingException;
Expand Down Expand Up @@ -294,6 +289,45 @@ public Object answer(InvocationOnMock mock) throws Throwable {
assertThat(ex.getErrorMessage()).isEqualTo("corresponding to accessConfig has been deleted failed");
}
}
@Test
public void testResumeCheckHalfMessage_WithException() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock mock) throws Throwable {
RemotingCommand request = mock.getArgument(1);
RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setOpaque(request.getOpaque());
response.setRemark("Put message back to RMQ_SYS_TRANS_HALF_TOPIC failed.");
return response;
}
}).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());

boolean result = mqClientAPI.resumeCheckHalfMessage(brokerAddr, "test", 3000);
assertThat(result).isEqualTo(false);
}

@Test
public void testResumeCheckHalfMessage_Success() throws InterruptedException, RemotingException, MQBrokerException, MQClientException {
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock mock) throws Throwable {
RemotingCommand request = mock.getArgument(1);
return createResumeSuccessResponse(request);
}
}).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());

boolean result = mqClientAPI.resumeCheckHalfMessage(brokerAddr, "test", 3000);

assertThat(result).isEqualTo(true);
}

private RemotingCommand createResumeSuccessResponse(RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(ResponseCode.SUCCESS);
response.setOpaque(request.getOpaque());
return response;
}

private RemotingCommand createSuccessResponse(RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
Expand Down Expand Up @@ -329,7 +363,7 @@ private RemotingCommand createSuccessResponse4DeleteAclConfig(RemotingCommand re
response.setOpaque(request.getOpaque());
response.markResponseType();
response.setRemark(null);

return response;
}

Expand Down
Loading

0 comments on commit 584a31c

Please sign in to comment.