Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #292] Add support of transactional message feature #358

Merged
merged 6 commits into from
Jul 15, 2018

Conversation

duhengforever
Copy link
Contributor

Please do not create a Pull Request without creating an issue first.

What is the purpose of the change

In order to implement eventual consistency in distributed system, we define a new type of message named transactional message, it can be thought of as a two-phase commit message implementation to ensure eventual consistency in distributed system. Transactional message ensures that the execution of local transaction and the sending of message can be performed atomically.

Brief changelog

In our implementation, we made full use of the mechanism of rocketMQ itself to avoid external dependencies, and we defined a pair of message: named half message and commit/rollback message. Half message means prepared message, which are not visible to consumers,and it will be sent before local transaction is executed. After the local transaction ends,producer will send a commit or rollback message to broker based on local transaction execution result

Verifying this change

This is a trivial change.

Follow this checklist to help us incorporate your contribution quickly and easily:

  • Make sure there is a Github issue filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue.
  • Format the pull request title like [ISSUE #123] Fix UnknownException when host config not exist. Each commit in the pull request should have a meaningful subject line and body.
  • Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
  • Write necessary unit-test to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in test module.
  • Run mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle to make sure basic checks pass. Run mvn clean install -DskipITs to make sure unit-test pass. Run mvn clean test-compile failsafe:integration-test to make sure integration-test pass.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

@duhengforever
Copy link
Contributor Author

Related Issue:
@lizhanhui @vongosling @dongeforever Have any comments and review about this PR?

@coveralls
Copy link

coveralls commented Jul 9, 2018

Coverage Status

Coverage increased (+0.8%) to 42.258% when pulling 835e013 on duhengforever:develop into 94f3b5a on apache:develop.

@lizhanhui
Copy link
Contributor

This is a big pull request, containing bulks of changes...Please wait patiently and I would join you as soon as possible.

Copy link
Member

@vongosling vongosling left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first review

}

class TransactionCheckListenerBImpl implements TransactionCheckListener {
//class TransactionExecuterBImpl implements LocalTransactionExecuter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The extra commented code has been removed in the latest commit,thanks for your reminding.


public TransactionCheckListenerBImpl(boolean ischeckffalse,
StatsBenchmarkTProducer statsBenchmarkTProducer) {
public TransactionListenerImpl(boolean ischeckffalse, boolean isCheckLocal,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ischeckffalse ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, the last commit did not modify the original code, including variable naming, but the latest commit has been changed.

+ tranStateTableOffset + ", commitLogOffset=" + commitLogOffset + ", commitOrRollback="
+ commitOrRollback + ", fromTransactionCheck=" + fromTransactionCheck + ", msgId=" + msgId
+ "]";
return "EndTransactionRequestHeader{" +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ToStringBuilder is easier way to reflect all field in one class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used IDEA's automatic generation tool to override the toString method.

@@ -17,7 +17,7 @@
package org.apache.rocketmq.client.producer;

import org.apache.rocketmq.common.message.Message;

@Deprecated
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

@@ -18,7 +18,7 @@
package org.apache.rocketmq.broker.transaction;

import java.util.List;

@Deprecated
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface has been deleted in the latest commit.

@vongosling
Copy link
Member

@shroman @Jaskey Could you help us to review the pr together?

final Channel channel,
final CheckTransactionStateRequestHeader requestHeader,
final SelectMappedBufferResult selectMappedBufferResult) {
final MessageExt messageExt) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this Exception be specific?

Copy link
Contributor Author

@duhengforever duhengforever Jul 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your review,in fact, I don't want to throw any exceptions here,but considering that the encode method may cause exception,so in this method just throws exception following with the encode method.

Channel channel = channelList.get(index);
int count = 0;
boolean isOk = channel.isActive() && channel.isWritable();
while (isOk && count++ < 3) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 is a magic number

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will use a constant to represent this value, which represents the number of retries for fetch an available channel.

} catch (Throwable e) {
log.error("invokeProducer exception", e);
selectMappedBufferResult.release();
this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an empirical value when using the invokeOneWay method. In the check scenario , we don't need to wait for the producer‘s response, and in order to prevent too many threads waiting,we adopted this configuration,but we also considered that whether it is necessary to make this parameter configurable,what’s your opinion?

package org.apache.rocketmq.broker.transaction;

@Deprecated
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deprecated only but missing the deadline and replacement is not a good practice


private static final int TRY_PULL_MSG_NUMBER = 1;

private final int queueTime = 60000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to avoid a queue check taking up too much time and leading to other queues to starve, we added this configuration,but I will change the variable name to uppercase.

@vongosling
Copy link
Member

vongosling commented Jul 11, 2018

LGTM if a rational response to my point out. This is an awesome feature, @lizhanhui @Jaskey @shroman Could we speed up the feedback for my kindly proposer, I would like to integrate into our develop branch :-) BTW, @duhengforever , do we have a wiki for the architecture and user guide?

@vongosling
Copy link
Member

@duhengforever Forget to comment, What is the coverage of the new code, over 80%?

@duhengforever
Copy link
Contributor Author

For transactional messages, I added some documentation, including examples and design documentation., I hope these documents can help you to review this PR, thanks!

@xingfudeshi
Copy link
Member

xingfudeshi commented Jul 13, 2018

great job!!it seems the good things are coming soon!

@@ -142,6 +150,9 @@
private BrokerFastFailure brokerFastFailure;
private Configuration configuration;
private FileWatchService fileWatchService;
private TransactionalMessageCheckService transactionMsgCheckService;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use consistent naming. If abbreviation were used, so were these relevant.

public void start() {
if (started.compareAndSet(false, true)) {
super.start();
this.brokerController.getTransactionalMessageService().open();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why open?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering that in other storage media implementations, maybe need to open some related settings, so provided this method.

}

@Override
public void run() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should take advantage of what ServiceThread have already provided. No

while(true)  {
...
sleep(xxx);
...
}

Use onWaitEnd() instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean waitForRunning() and onWaitEnd() methods? yeah, I used these two methods to transform this implementation in the latest commit.

public class TransactionalMessageServiceImpl implements TransactionalMessageService {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);

private TransactionalMessageBridge transactionBridge;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having TransactionalMessageService --> TransactionalMessageServiceImpl abstraction, is it necesary to add another proxy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, in the development process, I have considered this problem, I personally think it is not needed, and we provide the Serviceloader to load other implementations. and if add another proxy,it will make the call hierarchy deeper and deeper, and need to add a strategy to ensure loading to the correct implementation,not convenient for later code maintenance.

return transactionBridge.putHalfMessage(messageInner);
}

private boolean isNeedDiscard(MessageExt msgExt, int transactionCheckMax) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isNeedXXX naming is weird and does not follow grammar. Using needXXX suffices.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good suggestion

MessageExtBrokerInner msgInner = transactionBridge.renewHalfMessageInner(messageExt);
putMessageResult = transactionBridge.putMessageReturnResult(msgInner);
} catch (Exception e) {
e.printStackTrace();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just printStackTrace?
We need to recover from the error and mark tx service unavailable and propagate the unavailable state to producer clients if recovering is not possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for helping to point out the problem,output to the log will be a good choice, as in this place, The outer method uses this method to decide whether to send back the request ,based on the status of putMessageResult, so there's no need to propagate the unavailable state to producer.

@lizhanhui
Copy link
Contributor

  1. Newly added classes, member variables & methods should be well documented, explaining their role in your solution.
  2. Corner cases are not yet properly handled. For example, if pushing back half message experiences failure, what is the recovering procedure?
  3. How is the HA guaranteed in case the master is down?
  4. We prefer to use full-word in the identifier, it would be better if this pull request may follow this convention.
  5. I am wondering whether the service locator and tx service bridge is an excessive abstraction. IMO, simplicity takes precedence.
  6. A lot of the tx related configurations are hard-coded, making them configurable is preferable.

@duhengforever
Copy link
Contributor Author

@lizhanhui thanks for your excellent review firstly.

  1. There are comments on the key methods, and the class name directly describes the role of the current class.
  2. Some corner cases have been considered and verified,and for the problem of put half message back failed, in fact, we only send a check request after the put back is successful. and if put back failed, we will continue to process this message, and in order to avoid causing unlimited check,we also added the limit of check times for the message, and in this situation, we will also print an error log for easy monitoring and manual processing.
  3. In fact, the HA mechanism of the transaction message depends on the HA of the rocketMQ itself. In order to avoid the message not being lost, we recommend using the synchronous double-write mechanism and manually upgrading the slave when the master down.
  4. I have been changed the identifier to the full-word.
  5. I don't think it's an excessive abstraction, I think transaction message is an application of ordinary message, so I need a bridge to avoid directly manipulating the relevant storage of MQ.
  6. Some hard code have been removed, please review the latest commit.

@vongosling
Copy link
Member

Glad to see the great work in the community, I would like to merge the pr if all comments are resolved

@vongosling vongosling merged commit b054a9d into apache:develop Jul 15, 2018
@xingfudeshi
Copy link
Member

congrats!

@klaus-pd
Copy link

@duhengforever Why do we not implement transactions checking based on database, and how to confirm previous transaction information if the service is down?

index = (++index) % size;
channel = channelList.get(index);
return channel;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here if isOk= true will return next channel.
i think it should be like
while ( count++ < GET_AVALIABLE_CHANNEL_RETRY_COUNT) { if(isOk){ return channel; } index = (++index) % size; channel = channelList.get(index); isOk = channel.isActive() && channel.isWritable(); }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your careful review,and I will fix this problem ASAP.

@duhengforever
Copy link
Contributor Author

@klaus-pd In order to avoid external dependencies, such as database, we used the mechanism of rocketMQ itself to implement the check logic. and if the producer down, broker will send check request to other producers with same produceId.

@klaus-pd
Copy link

@duhengforever I know, but if only one producer of the same produceId, how to solve the problem of checking back?

@duhengforever
Copy link
Contributor Author

@klaus-pd broker will resend the check request when the producer restart util reached a limit.

@vongosling vongosling added this to the 4.3.0 milestone Jul 17, 2018
for (MessageExt opMessageExt : opMsg) {
Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));
log.info("Topic: {} tags: {}, OpOffset: {}, HalfOffset: {}", opMessageExt.getTopic(),
opMessageExt.getTags(), opMessageExt.getQueueOffset(), queueOffset);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));
why this can get the value of HalfOffset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Details can be found in TransactionalMessageService#deletePrepareMessage method.

renshuaibing-aaron pushed a commit to renshuaibing-aaron/rocketmq that referenced this pull request Apr 13, 2020
JiaMingLiu93 pushed a commit to JiaMingLiu93/rocketmq that referenced this pull request May 28, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants