-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-7415; Persist leader epoch and start offset on becoming a leader #5678
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji : Thanks for the patch. Looks good overall. Just a few minor comments below.
} | ||
epochs += entryToAppend | ||
|
||
debug(s"Appended new epoch entry $entryToAppend. Cache now contains ${epochs.size} entries.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is not expected, should we log a WARN or error instead of debug?
(requestedEpoch, leo().messageOffset) | ||
// The latest epoch is always the current epoch that is still being written to. Followers | ||
// should not have any reason to query the latest epoch since truncation always takes place | ||
// after the epoch is incremented. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this comment accurate? When a follower is restarted and the leader isn't, the follower actually will ask for the end offset for the latest epoch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me try to clarify. The point of this patch is to ensure that the cache always contains the latest epoch following a leader election. Let's say there is a leader election and the new epoch is 5. The cache will reflect epoch 5 with a starting offset equal to the log end offset at the time the replica became a leader. Followers would not have any need to send OffsetsForLeaderEpoch
queries for epoch 5, but the first epoch prior to 5.
if (subsequentEpochs.isEmpty || requestedEpoch < epochs.head.epoch) | ||
// no epochs recorded or requested epoch < the first epoch cached | ||
if (subsequentEpochs.isEmpty) { | ||
// The epoch is larger than any known epoch. This case should never be hit because the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The epoch => The requested epoch? Ditto in a few other places.
inReadLock(lock) { | ||
val epochAndOffset = | ||
if (requestedEpoch == UNDEFINED_EPOCH) { | ||
// this may happen if a bootstrapping follower sends a request with undefined epoch or | ||
// This may happen if a bootstrapping follower sends a request with undefined epoch or |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this change, it seems that epochs should never be empty when used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I kept this for upgrade compatibility.
core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
Show resolved
Hide resolved
core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
Show resolved
Hide resolved
if (epoch >= 0 && epoch > latestEpoch && offset >= latestOffset) { | ||
info(s"Updated PartitionLeaderEpoch. ${epochChangeMsg(epoch, offset)}. Cache now contains ${epochs.size} entries.") | ||
epochs += EpochEntry(epoch, offset) | ||
if (epoch != latestEpoch) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case when epoch == latestEpoch, if the startOffset is lower than what's in the cache, should we update the epoch cache with the lower offset for consistency?
core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
Show resolved
Hide resolved
baseOffset = startOffset, partitionLeaderEpoch = epoch) | ||
} | ||
|
||
val logConfig = LogTest.createLogConfig(segmentBytes = 10 * createRecords(0, 0).sizeInBytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this truly matters, but does createRecords(0, 0).sizeInBytes match the record size when records are created with different offsets?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. I think they should be the same. The batch base offset is a fixed size and the first record will always have a relative offset of 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji : Thanks for the updated patch. LGTM
#5678) This patch ensures that the leader epoch cache is updated when a broker becomes leader with the latest epoch and the log end offset as its starting offset. This guarantees that the leader will be able to provide the right truncation point even if the follower has data from leader epochs which the leader itself does not have. This situation can occur when there are back to back leader elections. Additionally, we have made the following changes: 1. The leader epoch cache enforces monotonically increase epochs and starting offsets among its entry. Whenever a new entry is appended which violates requirement, we remove the conflicting entries from the cache. 2. Previously we returned an unknown epoch and offset if an epoch is queried which comes before the first entry in the cache. Now we return the smallest . For example, if the earliest entry in the cache is (epoch=5, startOffset=10), then a query for epoch 4 will return (epoch=4, endOffset=10). This ensures that followers (and consumers in KIP-320) can always determine where the correct starting point is for the active log range on the leader. Reviewers: Jun Rao <[email protected]>
apache#5678) This patch ensures that the leader epoch cache is updated when a broker becomes leader with the latest epoch and the log end offset as its starting offset. This guarantees that the leader will be able to provide the right truncation point even if the follower has data from leader epochs which the leader itself does not have. This situation can occur when there are back to back leader elections. Additionally, we have made the following changes: 1. The leader epoch cache enforces monotonically increase epochs and starting offsets among its entry. Whenever a new entry is appended which violates requirement, we remove the conflicting entries from the cache. 2. Previously we returned an unknown epoch and offset if an epoch is queried which comes before the first entry in the cache. Now we return the smallest . For example, if the earliest entry in the cache is (epoch=5, startOffset=10), then a query for epoch 4 will return (epoch=4, endOffset=10). This ensures that followers (and consumers in KIP-320) can always determine where the correct starting point is for the active log range on the leader. Reviewers: Jun Rao <[email protected]>
#5678) (#5749) This patch ensures that the leader epoch cache is updated when a broker becomes leader with the latest epoch and the log end offset as its starting offset. This guarantees that the leader will be able to provide the right truncation point even if the follower has data from leader epochs which the leader itself does not have. This situation can occur when there are back to back leader elections. Additionally, we have made the following changes: 1. The leader epoch cache enforces monotonically increase epochs and starting offsets among its entry. Whenever a new entry is appended which violates requirement, we remove the conflicting entries from the cache. 2. Previously we returned an unknown epoch and offset if an epoch is queried which comes before the first entry in the cache. Now we return the smallest . For example, if the earliest entry in the cache is (epoch=5, startOffset=10), then a query for epoch 4 will return (epoch=4, endOffset=10). This ensures that followers (and consumers in KIP-320) can always determine where the correct starting point is for the active log range on the leader. Reviewers: Jun Rao <[email protected]>
apache#5678) This patch ensures that the leader epoch cache is updated when a broker becomes leader with the latest epoch and the log end offset as its starting offset. This guarantees that the leader will be able to provide the right truncation point even if the follower has data from leader epochs which the leader itself does not have. This situation can occur when there are back to back leader elections. Additionally, we have made the following changes: 1. The leader epoch cache enforces monotonically increase epochs and starting offsets among its entry. Whenever a new entry is appended which violates requirement, we remove the conflicting entries from the cache. 2. Previously we returned an unknown epoch and offset if an epoch is queried which comes before the first entry in the cache. Now we return the smallest . For example, if the earliest entry in the cache is (epoch=5, startOffset=10), then a query for epoch 4 will return (epoch=4, endOffset=10). This ensures that followers (and consumers in KIP-320) can always determine where the correct starting point is for the active log range on the leader. Reviewers: Jun Rao <[email protected]>
This patch ensures that the leader epoch cache is updated when a broker becomes leader with the latest epoch and the log end offset as its starting offset. This guarantees that the leader will be able to provide the right truncation point even if the follower has data from leader epochs which the leader itself does not have. This situation can occur when there are back to back leader elections.
Additionally, we have made the following changes:
Committer Checklist (excluded from commit message)