Skip to content

Commit

Permalink
add logic of resuming half message check
Browse files Browse the repository at this point in the history
  • Loading branch information
chengxiangwang committed Jan 14, 2019
1 parent 8a2344b commit 36aff5b
Show file tree
Hide file tree
Showing 10 changed files with 848 additions and 241 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.processor;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

import io.netty.channel.ChannelHandlerContext;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.ResumeCheckHalfMessageRequestHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.AppendMessageResult;
import org.apache.rocketmq.store.AppendMessageStatus;
import org.apache.rocketmq.store.MappedFile;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(PowerMockRunner.class)
@PrepareForTest({MessageDecoder.class})
public class AdminBrokerProcessorTest {

private AdminBrokerProcessor adminBrokerProcessor;

@Mock
private ChannelHandlerContext handlerContext;

@Spy
private BrokerController
brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(),
new MessageStoreConfig());

@Mock
private MessageStore messageStore;


@Before
public void init() {
brokerController.setMessageStore(messageStore);
adminBrokerProcessor = new AdminBrokerProcessor(brokerController);
}

@Test
public void testProcessRequest_success() throws RemotingCommandException, UnknownHostException {
RemotingCommand request = createResumeCheckHalfMessageCommand();
when(messageStore.selectOneMessageByOffset(any(Long.class))).thenReturn(createSelectMappedBufferResult());
PowerMockito.mockStatic(MessageDecoder.class);
PowerMockito.when(MessageDecoder.decode(any(ByteBuffer.class))).thenReturn(createDefaultMessageExt());
PowerMockito.when(MessageDecoder.decodeMessageId(any(String.class))).thenReturn(createMessageId());
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult
(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}

@Test
public void testProcessRequest_fail() throws RemotingCommandException, UnknownHostException {
RemotingCommand request = createResumeCheckHalfMessageCommand();
when(messageStore.selectOneMessageByOffset(any(Long.class))).thenReturn(createSelectMappedBufferResult());
PowerMockito.mockStatic(MessageDecoder.class);
PowerMockito.when(MessageDecoder.decode(any(ByteBuffer.class))).thenReturn(createDefaultMessageExt());
PowerMockito.when(MessageDecoder.decodeMessageId(any(String.class))).thenReturn(createMessageId());
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult
(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
}

@Test
public void testProcessRequest_exception() throws RemotingCommandException, UnknownHostException {
RemotingCommand request = createResumeCheckHalfMessageCommand();
when(messageStore.selectOneMessageByOffset(any(Long.class))).thenReturn(createSelectMappedBufferResult());
PowerMockito.mockStatic(MessageDecoder.class);
PowerMockito.when(MessageDecoder.decode(any(ByteBuffer.class))).thenReturn(createDefaultMessageExt());
PowerMockito.when(MessageDecoder.decodeMessageId(any(String.class))).thenThrow(new UnknownHostException());
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult
(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
}

private MessageExt createDefaultMessageExt() {
MessageExt messageExt = new MessageExt();
messageExt.setMsgId("12345678");
messageExt.setQueueId(0);
messageExt.setCommitLogOffset(123456789L);
messageExt.setQueueOffset(1234);
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_REAL_QUEUE_ID, "0");
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_REAL_TOPIC, "testTopic");
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, "15");
return messageExt;
}

private MessageId createMessageId() {
return new MessageId(new SocketAddress() {
}, 0);
}

private SelectMappedBufferResult createSelectMappedBufferResult(){
SelectMappedBufferResult result = new SelectMappedBufferResult(0, ByteBuffer.allocate(1024) ,0, new MappedFile());
return result;
}
private ResumeCheckHalfMessageRequestHeader createResumeCheckHalfMessageRequestHeader() {
ResumeCheckHalfMessageRequestHeader header = new ResumeCheckHalfMessageRequestHeader();
header.setMsgId("12345678");
return header;
}

private RemotingCommand createResumeCheckHalfMessageCommand() {
ResumeCheckHalfMessageRequestHeader header = createResumeCheckHalfMessageRequestHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.RESUME_CHECK_HALF_MESSAGE, header);
request.makeCustomHeaderToNet();
return request;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.ResetOffsetBody;
import org.apache.rocketmq.common.protocol.body.ResumeCheckHalfMessageResult;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
Expand Down Expand Up @@ -113,6 +114,7 @@
import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.QueryTopicConsumeByWhoRequestHeader;
import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.ResumeCheckHalfMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
Expand Down Expand Up @@ -2089,4 +2091,29 @@ public void checkClientInBroker(final String brokerAddr, final String consumerGr
throw new MQClientException(response.getCode(), response.getRemark());
}
}

public ResumeCheckHalfMessageResult resumeCheckHalfMessage(final String addr, String msgId,
final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
ResumeCheckHalfMessageRequestHeader requestHeader = new ResumeCheckHalfMessageRequestHeader();
requestHeader.setMsgId(msgId);

RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.RESUME_CHECK_HALF_MESSAGE, requestHeader);

RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
byte[] body = response.getBody();
if (body != null) {
ResumeCheckHalfMessageResult info = ResumeCheckHalfMessageResult.decode(body, ResumeCheckHalfMessageResult.class);
return info;
}
}
default:
break;
}

throw new MQClientException(response.getCode(), response.getRemark());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.common.protocol.body;

import org.apache.rocketmq.remoting.protocol.RemotingSerializable;

public class ResumeCheckHalfMessageResult extends RemotingSerializable {

private ResumeResult resumeResult;
private String remark;
private long spentTimeMills;


public ResumeResult getResumeResult() {
return resumeResult;
}

public void setResumeResult(ResumeResult resumeResult) {
this.resumeResult = resumeResult;
}

public String getRemark() {
return remark;
}

public void setRemark(String remark) {
this.remark = remark;
}


public long getSpentTimeMills() {
return spentTimeMills;
}

public void setSpentTimeMills(long spentTimeMills) {
this.spentTimeMills = spentTimeMills;
}

@Override
public String toString() {
return "ConsumeMessageDirectlyResult [consumeResult=" + resumeResult + ", remark=" + remark
+ ", spentTimeMills="
+ spentTimeMills + "]";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.common.protocol.body;

public enum ResumeResult {
RESUME_SUCCESS,
RESUME_THROW_EXCEPTION
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.common.protocol.header;

import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;

public class ResumeCheckHalfMessageRequestHeader implements CommandCustomHeader {
@CFNullable
private String msgId;

@Override
public void checkFields() throws RemotingCommandException {

}

public String getMsgId() {
return msgId;
}

public void setMsgId(String msgId) {
this.msgId = msgId;
}

@Override
public String toString() {
return "ResumeCheckHalfMessageRequestHeader [msgId=" + msgId + "]";
}
}
14 changes: 13 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,19 @@
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.6.3</version>
<version>2.23.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito2</artifactId>
<version>2.0.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<version>2.0.0</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.ResumeCheckHalfMessageResult;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
Expand Down Expand Up @@ -513,4 +514,17 @@ public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String
brokerAddr, topic, queueId, index, count, consumerGroup
);
}

@Override
public ResumeCheckHalfMessageResult resumeCheckHalfMessage(String msgId)
throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
return this.defaultMQAdminExtImpl.resumeCheckHalfMessage(msgId);
}

@Override
public ResumeCheckHalfMessageResult resumeCheckHalfMessage(String topic,
String msgId)
throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
return this.defaultMQAdminExtImpl.resumeCheckHalfMessage(topic, msgId);
}
}
Loading

0 comments on commit 36aff5b

Please sign in to comment.