-
Notifications
You must be signed in to change notification settings - Fork 11.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #598] Enhance transaction by putting messages that exceed max check times to system topic #633
[ISSUE #598] Enhance transaction by putting messages that exceed max check times to system topic #633
Conversation
@xiangwangcheng please rebase first and make sure this PR only contains your commit history. |
8cf0d6a
to
5034136
Compare
@duhenglucky I already did rebase and now the commit history is much better. |
36aff5b
to
c8f1fe3
Compare
c8f1fe3
to
8b56256
Compare
TopicConfig topicConfig = this.getBrokerController().getTopicConfigManager() | ||
.createTopicOfTranCheckMaxTime(TCMT_QUEUE_NUMS, | ||
PermName.PERM_READ | PermName.PERM_WRITE); | ||
int queueId = Math.abs(random.nextInt() % 99999999) % TCMT_QUEUE_NUMS; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TRANS_CHECK_MAX_TIME_TOPIC's queue num is 1,so the variable queueId should be set to 1,directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, as @zongtanghu comment, it has the difference with sendBackMessage method, so if only one queue in this inner topic, please use a constant instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for @xiangwangcheng's excellent PR, and we have put forward some comments for reference.
|
||
import org.apache.rocketmq.remoting.protocol.RemotingSerializable; | ||
|
||
public class ResumeCheckHalfMessageResult extends RemotingSerializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put this part of the content into the header and it seems that we don't need an API return some metrics data in response.
|
||
package org.apache.rocketmq.common.protocol.body; | ||
|
||
public enum ResumeResult { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, it would be nice if we can use the ResponseCode directly.
@@ -454,7 +454,19 @@ | |||
<dependency> | |||
<groupId>org.mockito</groupId> | |||
<artifactId>mockito-core</artifactId> | |||
<version>2.6.3</version> | |||
<version>2.23.0</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why need to import other mockito packages here?
|
||
if (requestHeader.getTopic().equals(this.brokerController.getBrokerConfig().getBrokerClusterName())) { | ||
String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words."; | ||
(CreateTopicRequestHeader) request |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If no other changes, please don't reformat the code :)
|
||
public DefaultTransactionalMessageCheckListener() { | ||
super(); | ||
} | ||
|
||
@Override | ||
public void resolveDiscardMsg(MessageExt msgExt) { | ||
log.error("MsgExt:{} has been checked too many times, so discard it", msgExt); | ||
log.error( | ||
"MsgExt:{} has been checked too many times, so discard it by moving it to system topic TRANS_CHECK_MAXTIME_TOPIC", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can change the log format to "[NOTIFYME]MsgExt: ...."?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
TopicConfig topicConfig = this.getBrokerController().getTopicConfigManager() | ||
.createTopicOfTranCheckMaxTime(TCMT_QUEUE_NUMS, | ||
PermName.PERM_READ | PermName.PERM_WRITE); | ||
int queueId = Math.abs(random.nextInt() % 99999999) % TCMT_QUEUE_NUMS; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, as @zongtanghu comment, it has the difference with sendBackMessage method, so if only one queue in this inner topic, please use a constant instead.
780b2bb
to
76d700e
Compare
broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
Outdated
Show resolved
Hide resolved
broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
Outdated
Show resolved
Hide resolved
Firstly,please resolve the confict codes in this pr. @xiangwangcheng And then,please help to review this pr. @ShannonDing @jonnxu @wlliqipeng Thanks very much. |
…opic TRANS_CHECK_MAXTIME_TOPIC
71c1df1
to
729f33f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
LGTM |
good idea, LGTM |
729f33f
to
8f973de
Compare
…d max check times to system topic (apache#633) * add logic of putting message that exceeds max-check-times to system topic TRANS_CHECK_MAXTIME_TOPIC * add test case:testResolveDiscardMsg * add @after logic to test case * comment brokerController.shutdown and use mock * add logic of resuming half message check * add test case:resumeCheckHalfMessage * delete commented codes
…d max check times to system topic (apache#633) * add logic of putting message that exceeds max-check-times to system topic TRANS_CHECK_MAXTIME_TOPIC * add test case:testResolveDiscardMsg * add @after logic to test case * comment brokerController.shutdown and use mock * add logic of resuming half message check * add test case:resumeCheckHalfMessage * delete commented codes
What is the purpose of the change
Fix #598
Brief changelog
For half messages that check times exceed the max check times(15 by default), not doing anything in here
and instead of this, put it into a system topic will be more meaningful. And you can also reopen message check logic manually.
(The logic of reopen message check will be submitted later.)
Brief Design:
Modification:
resumeCheckHalfMessage
method to AdminBrokerProcessor.java to recieve resume request from client.resumeCheckHalfMessage
in MQClientAPIImpl.java to send resume request to broker.Verifying this change
XXXX
Follow this checklist to help us incorporate your contribution quickly and easily. Notice,
it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR
.[ISSUE #123] Fix UnknownException when host config not exist
. Each commit in the pull request should have a meaningful subject line and body.mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle
to make sure basic checks pass. Runmvn clean install -DskipITs
to make sure unit-test pass. Runmvn clean test-compile failsafe:integration-test
to make sure integration-test pass.