Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message offset adjust ticket #1010 #1017

Merged

Conversation

breadpowder
Copy link

This is for ticket id #1010 regarding offset.

The offset validation logic is refactored a bit to ensure working for both transactional and non-transactional mode. Since:

  1. in transactional mode messages are not +1 between messages
  2. in non-transaction mode, there might be very minor chances a message get delivered more than once due to retrying.

Now the offset logic in message writer works: If the current processing message offset is not greater than "last seen offset" (messages have written), it means there are rebalances and/or the broker is replaying message from last committed offset/failure point. Since the messages before offsets are written, we can safely trim those message to remove duplicates.

The existing commit offset validation logic in upload class remains intact . The existing logic ensures that after rebalances, if another consumer has made progress on a same topic/partition, we either trims those files or trim corresponding completed offset.

@HenryCaiHaiying
Copy link
Contributor

There are too many files changed in this PR. Please check your coding style. We don't want to use wildcard imports, each individual package needs to be explicitly specified and imported.

@breadpowder breadpowder force-pushed the message-offset-adjust-1010 branch 2 times, most recently from 564b436 to 4a2f234 Compare October 18, 2019 19:16
@breadpowder
Copy link
Author

breadpowder commented Oct 18, 2019

Done. Please review the latest one. This one is rebased on most recent master and leaves my commits clear to read.

Copy link
Contributor

@HenryCaiHaiying HenryCaiHaiying left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the fix needs to be that complicated.

To distinguish the case between:

  1. offset progression no longer go sequential because consumer rebalance happens
  2. offset progression jumps because of transactional message

We can use ConsumerRebalanceListener to listen for consumer rebalance events, we already used SecurMessageIterator for those callback events, we can a bit more logic there to clear the lastSeenOffsets as well as remove underlying files if the partition assignment changes.

If we do that cleanup in the ConsumerRebalanceListener, we can simply change != to < in adjustOffset() method.

@breadpowder
Copy link
Author

breadpowder commented Oct 21, 2019

How about a bit enhancement over the above approach? Instead of removing the underlying files when rebalancing happens, let's upload the file and commit the offset in the partition revoked notification. Therefore, consumers don't need to rewrite the same data.

@HenryCaiHaiying
Copy link
Contributor

If the rebalance happens, the files the current consumer was working on are no longer source of truth. The other consumer can already upload all or part of those offsets to S3. The other situation is the consumer was working on partition 0 for some time, then rebalance event 1 happens, he lost partition 0 but he still keep some unfinished partition 0 in local filesystem. After sometime another rebalance event 2 happens, he got back partition 0 but the offset jumps ahead already. If he continues appending the new messages from partition 0 to local files, those files will have a gap on message offsets. So it's best to clear all local files when offset inconsistency discovers.

The messages are not lost because some other consumers already uploaded those offsets to S3.

The transactional message is a bit different story, that's why we need to distinguish between this case and consumer rebalance.

@breadpowder
Copy link
Author

breadpowder commented Oct 21, 2019

Thank you! Here, I understand and agree with your points here regarding the other customer takes over and the possibility about offset jump when a previous partition assigned back .

Here, my thinking is we don't need to clear the local files to ensure consistency.Instead, we can do a upload and commit offset in the "partition revoke" callback given the kafka consumerRebalanceListener Api's promises (http://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html). The api document says "this method will be called before a rebalance operation starts and after the consumer stops fetching data. It is recommended that offsets should be committed in this callback to either Kafka or a custom offset store to prevent duplicate data".

Therefore, we can safely upload, commit the offset and clear the lastseen offset after process existing messages buffer in the "partition revoke callback". It won't cause data inconsistency issue given the other consumer will start from the newly committed offset, as they work sequentially without any messages overlap.

By doing this, the benefit is that the other customer don't need to reprocess the same data. Let me know your thoughts. Thank you again for sharing your thoughts!

@HenryCaiHaiying
Copy link
Contributor

HenryCaiHaiying commented Oct 21, 2019 via email

@breadpowder
Copy link
Author

Sure. will commit soon.

@HenryCaiHaiying
Copy link
Contributor

HenryCaiHaiying commented Oct 21, 2019 via email

@breadpowder
Copy link
Author

As discussed, rebalance logic is added into SecorConsumerRebalanceListener and RebalanceHandler. Please review.

Copy link
Contributor

@HenryCaiHaiying HenryCaiHaiying left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, the code change looks fine. But there are too many lines changed to make the diff review difficult. I wasn't quite sure why you need to restructure the class/inner-classes in SecorKafkaMessageIterator, it seems you can can just add the new reset logic in the ConsumerRebalancerListener inner class.

topicPartition.getTopic(),topicPartition.getPartition(),lastSeenOffset, offset);
} else {
if (offset < lastSeenOffset + 1) {
LOG.warn("offset for topic {} partition {} goes back from {} to {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we throw an exception here? I don't think we should see this happen with the code change in Rebalance listener?

Copy link
Author

@breadpowder breadpowder Oct 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For compatibility with LegacyKafkaMessageIterator, since no such listener exists, we can't throw exception here. Any idea?

Copy link
Author

@breadpowder breadpowder Oct 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would prefer to use a separator class for RebalanceListener since rebalance and handler logic is separate concern from message iterator so better to be a complete seperate unit.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if the above is ok, will push shortly.

@breadpowder
Copy link
Author

breadpowder commented Oct 23, 2019

Just happened to find that this issue also related another ticket #682

@HenryCaiHaiying HenryCaiHaiying merged commit 5c51d7e into pinterest:master Oct 25, 2019
@HenryCaiHaiying
Copy link
Contributor

lgtm, thanks for the effort.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants