Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #598] Enhance transaction by putting messages that exceed max check times to system topic #633

Merged
merged 7 commits into from
Jun 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
Expand All @@ -53,7 +54,10 @@
import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
Expand Down Expand Up @@ -100,6 +104,7 @@
import org.apache.rocketmq.common.protocol.header.QueryCorrectionOffsetHeader;
import org.apache.rocketmq.common.protocol.header.QueryTopicConsumeByWhoRequestHeader;
import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.ResumeCheckHalfMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader;
Expand All @@ -120,8 +125,11 @@
import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;

public class AdminBrokerProcessor implements NettyRequestProcessor {
Expand Down Expand Up @@ -216,6 +224,8 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx,
return getBrokerAclConfigVersion(ctx, request);
case RequestCode.UPDATE_GLOBAL_WHITE_ADDRS_CONFIG:
return updateGlobalWhiteAddrsConfig(ctx, request);
case RequestCode.RESUME_CHECK_HALF_MESSAGE:
return resumeCheckHalfMessage(ctx, request);
default:
break;
}
Expand Down Expand Up @@ -262,7 +272,7 @@ private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext

this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);

this.brokerController.registerIncrementBrokerData(topicConfig,this.brokerController.getTopicConfigManager().getDataVersion());
this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());

return null;
}
Expand Down Expand Up @@ -1518,4 +1528,62 @@ private RemotingCommand queryConsumeQueue(ChannelHandlerContext ctx,

return response;
}

private RemotingCommand resumeCheckHalfMessage(ChannelHandlerContext ctx,
RemotingCommand request)
throws RemotingCommandException {
final ResumeCheckHalfMessageRequestHeader requestHeader = (ResumeCheckHalfMessageRequestHeader) request
.decodeCommandCustomHeader(ResumeCheckHalfMessageRequestHeader.class);
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
SelectMappedBufferResult selectMappedBufferResult = null;
try {
MessageId messageId = MessageDecoder.decodeMessageId(requestHeader.getMsgId());
selectMappedBufferResult = this.brokerController.getMessageStore()
.selectOneMessageByOffset(messageId.getOffset());
MessageExt msg = MessageDecoder.decode(selectMappedBufferResult.getByteBuffer());
msg.putUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, String.valueOf(0));
PutMessageResult putMessageResult = this.brokerController.getMessageStore()
.putMessage(toMessageExtBrokerInner(msg));
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
log.info(
"Put message back to RMQ_SYS_TRANS_HALF_TOPIC. real topic={}",
msg.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
} else {
log.error("Put message back to RMQ_SYS_TRANS_HALF_TOPIC failed.");
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("Put message back to RMQ_SYS_TRANS_HALF_TOPIC failed.");
}
} catch (Exception e) {
log.error("Exception was thrown when putting message back to RMQ_SYS_TRANS_HALF_TOPIC.");
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("Exception was thrown when putting message back to RMQ_SYS_TRANS_HALF_TOPIC.");
} finally {
if (selectMappedBufferResult != null) {
selectMappedBufferResult.release();
}
}
return response;
}

private MessageExtBrokerInner toMessageExtBrokerInner(MessageExt msgExt) {
MessageExtBrokerInner inner = new MessageExtBrokerInner();
inner.setTopic(TransactionalMessageUtil.buildHalfTopic());
inner.setBody(msgExt.getBody());
inner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(inner, msgExt.getProperties());
inner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
inner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(msgExt.getTags()));
inner.setQueueId(0);
inner.setSysFlag(msgExt.getSysFlag());
inner.setBornHost(msgExt.getBornHost());
inner.setBornTimestamp(msgExt.getBornTimestamp());
inner.setStoreHost(msgExt.getStoreHost());
inner.setReconsumeTimes(msgExt.getReconsumeTimes());
inner.setMsgId(msgExt.getMsgId());
inner.setWaitStoreMsgOK(false);
return inner;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public TopicConfig createTopicInSendMessageMethod(final String topic, final Stri
}

if (createNew) {
this.brokerController.registerBrokerAll(false, true,true);
this.brokerController.registerBrokerAll(false, true, true);
}

return topicConfig;
Expand Down Expand Up @@ -264,7 +264,47 @@ public TopicConfig createTopicInSendMessageBackMethod(
}

if (createNew) {
this.brokerController.registerBrokerAll(false, true,true);
this.brokerController.registerBrokerAll(false, true, true);
}

return topicConfig;
}

public TopicConfig createTopicOfTranCheckMaxTime(final int clientDefaultTopicQueueNums, final int perm) {
TopicConfig topicConfig = this.topicConfigTable.get(MixAll.TRANS_CHECK_MAX_TIME_TOPIC);
if (topicConfig != null)
return topicConfig;

boolean createNew = false;

try {
if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
topicConfig = this.topicConfigTable.get(MixAll.TRANS_CHECK_MAX_TIME_TOPIC);
if (topicConfig != null)
return topicConfig;

topicConfig = new TopicConfig(MixAll.TRANS_CHECK_MAX_TIME_TOPIC);
topicConfig.setReadQueueNums(clientDefaultTopicQueueNums);
topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums);
topicConfig.setPerm(perm);
topicConfig.setTopicSysFlag(0);

log.info("create new topic {}", topicConfig);
this.topicConfigTable.put(MixAll.TRANS_CHECK_MAX_TIME_TOPIC, topicConfig);
createNew = true;
this.dataVersion.nextVersion();
this.persist();
} finally {
this.lockTopicConfigTable.unlock();
}
}
} catch (InterruptedException e) {
log.error("create TRANS_CHECK_MAX_TIME_TOPIC exception", e);
}

if (createNew) {
this.brokerController.registerBrokerAll(false, true, true);
}

return topicConfig;
Expand All @@ -289,7 +329,7 @@ public void updateTopicUnitFlag(final String topic, final boolean unit) {
this.dataVersion.nextVersion();

this.persist();
this.brokerController.registerBrokerAll(false, true,true);
this.brokerController.registerBrokerAll(false, true, true);
}
}

Expand All @@ -309,7 +349,7 @@ public void updateTopicUnitSubFlag(final String topic, final boolean hasUnitSub)
this.dataVersion.nextVersion();

this.persist();
this.brokerController.registerBrokerAll(false, true,true);
this.brokerController.registerBrokerAll(false, true, true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.rocketmq.broker.transaction;

import io.netty.channel.Channel;
import java.util.Random;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageConst;
Expand All @@ -36,6 +37,10 @@ public abstract class AbstractTransactionalMessageCheckListener {

private BrokerController brokerController;

//queue nums of topic TRANS_CHECK_MAX_TIME_TOPIC
protected final static int TCMT_QUEUE_NUMS = 1;
protected final Random random = new Random(System.currentTimeMillis());

private static ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,18 @@
package org.apache.rocketmq.broker.transaction.queue;

import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;

public class DefaultTransactionalMessageCheckListener extends AbstractTransactionalMessageCheckListener {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
Expand All @@ -31,6 +39,41 @@ public DefaultTransactionalMessageCheckListener() {

@Override
public void resolveDiscardMsg(MessageExt msgExt) {
log.error("MsgExt:{} has been checked too many times, so discard it", msgExt);
log.error("MsgExt:{} has been checked too many times, so discard it by moving it to system topic TRANS_CHECK_MAXTIME_TOPIC", msgExt);

try {
MessageExtBrokerInner brokerInner = toMessageExtBrokerInner(msgExt);
PutMessageResult putMessageResult = this.getBrokerController().getMessageStore().putMessage(brokerInner);
if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
log.info("Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC OK. Restored in queueOffset={}, " +
"commitLogOffset={}, real topic={}", msgExt.getQueueOffset(), msgExt.getCommitLogOffset(), msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
} else {
log.error("Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC failed, real topic={}, msgId={}", msgExt.getTopic(), msgExt.getMsgId());
}
} catch (Exception e) {
log.warn("Put checked-too-many-time message to TRANS_CHECK_MAXTIME_TOPIC error. {}", e);
}

}

private MessageExtBrokerInner toMessageExtBrokerInner(MessageExt msgExt) {
TopicConfig topicConfig = this.getBrokerController().getTopicConfigManager().createTopicOfTranCheckMaxTime(TCMT_QUEUE_NUMS, PermName.PERM_READ | PermName.PERM_WRITE);
int queueId = Math.abs(random.nextInt() % 99999999) % TCMT_QUEUE_NUMS;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TRANS_CHECK_MAX_TIME_TOPIC's queue num is 1,so the variable queueId should be set to 1,directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, as @zongtanghu comment, it has the difference with sendBackMessage method, so if only one queue in this inner topic, please use a constant instead.

MessageExtBrokerInner inner = new MessageExtBrokerInner();
inner.setTopic(topicConfig.getTopicName());
inner.setBody(msgExt.getBody());
inner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(inner, msgExt.getProperties());
inner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
inner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(msgExt.getTags()));
inner.setQueueId(queueId);
inner.setSysFlag(msgExt.getSysFlag());
inner.setBornHost(msgExt.getBornHost());
inner.setBornTimestamp(msgExt.getBornTimestamp());
inner.setStoreHost(msgExt.getStoreHost());
inner.setReconsumeTimes(msgExt.getReconsumeTimes());
inner.setMsgId(msgExt.getMsgId());
inner.setWaitStoreMsgOK(false);
return inner;
}
}
Loading