-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-8421: Still return data during rebalance #7312
KAFKA-8421: Still return data during rebalance #7312
Conversation
@ableegoldman @hachikuji for reviews. |
retest this please |
Also cc @ConcurrencyPractitioner who've looked into this. |
…turn-data-during-rebalance
retest this please |
Every build has 10+ failures, mostly the |
I ran them locally but they do pass (some are indeed flaky: with 10 runs they are going to fail at least once). I will have one more commit trying to address some comments and will ping again when I'm done. |
Discussed a little bit offline. We need to be a little careful with how an active rebalance affects other consumer operations. Specifically, a call to I think it might also be possible to get into a bad state if we are stuck between joining and syncing. The SyncGroup request will not be retried automatically. We rely on a call to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, LGTM. Just a single meta-comment.
// after the long poll, we should filter the returned data if their belonging | ||
// partitions are no longer owned by the consumer | ||
final Set<TopicPartition> assignedPartitions = subscriptions.assignedPartitions(); | ||
return this.interceptors.onConsume(new ConsumerRecords<>(records.entrySet() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A meta-comment here.
I think that filtering the records under most conditions would be redundant since the assignment usually does not change. Since the assignment remains static unless a rebalance occurs, I think that we only need to filter records when we have found that the assignment has changed like after a rebalance is finished. In the case where the assignment does not change, this segment of the code has no effect since the assignment has remained the same. It probably would act as a substantial performance hit since this portion of the code is often heavily congested.
We probably should have some sort of check that only filters the records when the assignment has changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a fair point. Originally I thought that the returned records are in the form of Map<TopicPartition, List<ConsumerRecord<K, V>>>
and we are filtering per topic-partition not per-record, so it may be okay; but if there's a better way to avoid unnecessary stream()
call we should do it.
One thing I can think of is to leverage on Fetcher#clearBufferedDataForUnassignedPartitions
and call that upon partition assignment changes. But since the background thread can also handle the fetch response it means that concurrent access on completedFetches
is possible, and its iterator()
is only weakly consistent: if the background thread is adding new batches to that list while the caller thread is iterating / removing from that list, it may not get removed.
Seems that with the locking on ConsumerNetworkClient#poll
the above should never happen (cc @hachikuji to confirm), in which case I think the above idea should work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I think this check is not needed, since in fetcher#fetchRecords
we already do this filtering:
if (!subscriptions.isAssigned(completedFetch.partition)) {
// this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
log.debug("Not returning fetched records for partition {} since it is no longer assigned",
completedFetch.partition);
} else if (!subscriptions.isFetchable(completedFetch.partition)) {
// this can happen when a partition is paused before fetched records are returned to the consumer's
// poll call or if the offset is being reset
log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable",
completedFetch.partition);
}
And since the assignment would ONLY be updated at the caller thread, and never at the background thread, we are safe to use this field to filter the fetched records which are only returned to the caller thread and hence ordering is guaranteed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see.
Edit: Actually, upon closer inspection, it appears that I had got the order mixed up. Looks like all things are accounted for. That was my mistake.
throw new IllegalArgumentException("Attempt to dynamically assign partitions while manual assignment in use"); | ||
|
||
|
||
Map<TopicPartition, TopicPartitionState> assignedPartitionStates = partitionToStateMap(assignments); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: should this be declared final?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack.
@@ -222,21 +222,45 @@ public synchronized boolean assignFromUser(Set<TopicPartition> partitions) { | |||
if (this.assignment.partitionSet().equals(partitions)) | |||
return false; | |||
|
|||
Map<TopicPartition, TopicPartitionState> assignedPartitionStates = partitionToStateMap(partitions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: should declare final as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack.
…turn-data-during-rebalance
@hachikuji Here's my thoughts about the interleaving of rebalance and commit requests:
2.a) The group is still in Note that if the join-group failed with a fatal error, the commit request would also fail with the same error (confirmed with code); if the join-group failed with a retriable error, the commit request would also fail with that error. Among them:
2.b) The group is already in
3.a) The group has already transited to With incremental protocol it is fine since those owned partitions would not be re-assigned to others in that rebalance immediately; with eager protocol we just need to make sure that nothing gets sent in offset commits since nothing is owned. 3.b) The group has not transited, and is still in
So in sum the commitSync call should not be stuck while interleaving with the rebalance, but it may indeed fail with CommitFailed ( case 2.b) and 3.b)). In many callers, CommitFailed is treated as a fatal error: for example, in Streams it is translated as TaskMigrated and may cause unnecessary error handling. However, in those cases it is not necessarily a fatal error. I’d propose we do the following: a. Filter the passed in offsets map with assigned partitions (this is to help case 3.a)). WDYT? |
@ableegoldman @hachikuji @RichardYuSTUG I've updated the PR with the above analysis, would like you to take another look. |
…turn-data-during-rebalance
Failed tests pass locally, retest this please |
retest this please |
// since even if we are 1) in the middle of a rebalance or 2) have partitions | ||
// with unknown starting positions we may still want to return some data | ||
// as long as there are some partitions fetchable | ||
updateAssignmentMetadataIfNeeded(time.timer(1L)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the tricky part: I have to use a non-zero timer so that blocking rpc like find-coordinator is guaranteed to poll for once, otherwise the future.hasExpired
will trigger immediately and we would doomed to be not finishing that call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering about two things here:
- How robust is this solution? Is it guaranteed that with the timer set to 1 the call is done?
- If I look at
kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
Lines 210 to 215 in d112ffd
public boolean poll(RequestFuture<?> future, Timer timer) { do { poll(timer, future); } while (!future.isDone() && timer.notExpired()); return future.isDone(); }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the timer is set to zero, it means that we would not send the request at all -- note that client.send
call just queued up the request in the queue, and only client.poll
would write it to the socket, and that's why I cannot have the timer to be 0.
Setting it to be 1 should be robust since the timer
is initialized at the beginning of the call and until the first client.poll()
it would not be checked, and hence not returned early.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've further polished it to only use timer(1)
if the remaining is still > 0, in this case we still make sure that consumer.poll(0)
can return instantaneously.
if (generation == null) { | ||
log.info("Failing OffsetCommit request since the consumer is not part of an active group"); | ||
return RequestFuture.failure(new CommitFailedException()); | ||
return RequestFuture.failure(new RebalanceInProgressException("Offset commit cannot be completed since the " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the main proposal for returning a different exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it always the case that if generation
is null
that it's due to a rebalance in progress? Or do we want to keep the ComitFailedException
and add to the exception message that the exception could be due to a rebalance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good question, I would replace this case with the RetriableCommitFailed exception (see my other comment below).
*/ | ||
requestRejoin(); | ||
future.raise(new RebalanceInProgressException()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the main proposal for returning a different exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guozhangwang Thanks for the PR.
I did a first pass.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
Outdated
Show resolved
Hide resolved
// since even if we are 1) in the middle of a rebalance or 2) have partitions | ||
// with unknown starting positions we may still want to return some data | ||
// as long as there are some partitions fetchable | ||
updateAssignmentMetadataIfNeeded(time.timer(1L)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering about two things here:
- How robust is this solution? Is it guaranteed that with the timer set to 1 the call is done?
- If I look at
kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
Lines 210 to 215 in d112ffd
public boolean poll(RequestFuture<?> future, Timer timer) { do { poll(timer, future); } while (!future.isDone() && timer.notExpired()); return future.isDone(); }
consumer.poll(Duration.ZERO); | ||
|
||
assertEquals(Utils.mkSet(topic, topic2), consumer.subscription()); | ||
assertEquals(Utils.mkSet(tp0, t2p0), consumer.assignment()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you really need this checks to verify your code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to make sure my edits on updateAssignmentMetadataIfNeeded
did not change any existing logic.
…turn-data-during-rebalance
|
||
// it's possible that the partition is no longer assigned when the response is received, | ||
// so we need to ignore seeking if that's the case | ||
if (this.subscriptions.isAssigned(tp)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji This is the change I make for auto offset reset.
I thought about adding a unit test but since the logic is wrapped in a single refreshCommittedOffsetsIfNeeded
it is hard to change subscription in between without breaking it into multiple ones, but I feel it is too messy to test a singleton function so I did not add one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to move this check above a little bit?
if (offsetAndMetadata != null && subscriptions.isAssigned(tp)) {
That will make the log message less confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, but I thought even if the partition is no longer assigned, we may still want to update its epoch; on the other hand your concern is valid. Will tweak it a bit more.
* This can occur if, e.g. consumer instance is in the middle of a rebalance so it is not yet determined | ||
* which partitions would be assigned to the consumer yet. In such cases you can first complete the rebalance | ||
* by calling {@link #poll(Duration)} and retry committing offsets again. NOTE when you retry after the | ||
* * rebalance the assigned partitions may have changed, and also for those partitions that are still assigned |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This javadoc needs to be fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@hachikuji it is ready for another look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, left a few more comments.
} catch (final RetriableCommitFailedException error) { | ||
// commitSync throws this error and can be ignored (since EOS is not enabled, even if the task crashed | ||
// immediately after this commit, we would just reprocess those records again) | ||
log.info("Committing failed with a non-fatal error, we can ignore this since commit may succeed still"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just in case, it's probably a good idea to include the exception in the message.
final RequestFuture<Void> future = lookupCoordinator(); | ||
client.poll(future, timer); | ||
|
||
if (!future.isDone()) { | ||
// ran out of time | ||
future.addListener(new RequestFutureListener<Void>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want this listener only when coordinator lookup is triggered through ensureCoordinatorReady
? Other paths may use the future from a call to ensureCoordinatorReady
. Conversely, we may use a future which was sent through another path here. Could we instead move this logic to lookupCoordinator
so that it is handled consistently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji I actually did it intentionally: if we move the logic to the callee then any client.poll
may trigger the callback and potentially throw the exception other than just consumer.poll
as we are testing in the unit test. Currently there are three callers of lookupCoordinator
and AFAIK the other two do not need to check if the future failure is non-retriable and hence need to be thrown. Of course if in the future we add another caller which does want to check then this would be vulnerable.
So I'd propose that if we make this behavior consistent inside lookupCoordinator
, then we need to mark all public APIs that may leads to it to @throw
those exceptions (today consumer.poll
and some other APIs do have the marks, but not all of them). WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regardless of where the lookupCoordinator
is triggered, we are only raising it from ensureCoordinatorReady
, so I am not sure I follow the point about raising from other contexts.
Note there doesn't appear to be any logic preventing multiple listeners from getting attached to the future. I think it would be better to always attach the listener when the future is created.
|
||
log.info("Setting offset for partition {} to the committed offset {}", tp, position); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove newlines
// need to reset generation and re-join group | ||
resetGenerationOnResponseError(ApiKeys.OFFSET_COMMIT, error); | ||
future.raise(new CommitFailedException()); | ||
return; | ||
} else if (error == Errors.ILLEGAL_GENERATION) { | ||
if (this.generation.equals(ConsumerCoordinator.this.generation())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How much effort would it be to write a test case which hits this path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to add the test but found it may not be a valid case actually: we think this can happen when a join request is sent, and then a commit request is sent, and then a join response is received and then a commit response is received all from the same socket.
However when a join-request is sent we already transit to the REBALANCING
state, and then in sendOffsetCommitRequest
above: https://github.com/apache/kafka/pull/7312/files#diff-e9c1ee46a19a8684d9d8d8a8c77f9005R1067
we would immediately fail with a RetriableCommitFailure exception: if it is called with an async commit call, it would mark it as failure, if it is called with a sync commit, that exception would be thrown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it sounds to me that we do not need this specific handling since we should actually never hit this scenario?
consumer.assign(List(tp, tp2).asJava) | ||
sendRecords(producer, numRecords, tp2) | ||
var topic2RecordConsumed = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we really need to be testing with two separate topics here. We already have a hard time with the flakiness of this test.
…turn-data-during-rebalance
…turn-data-during-rebalance
@hachikuji I have made another commit, which moves back to |
Looked into |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, left a few more comments.
* @throws org.apache.kafka.clients.consumer.RetriableCommitFailedException if the commit failed but can be retried. | ||
* This can occur if, e.g. consumer instance is in the middle of a rebalance so it is not yet deteremined | ||
* which partitions would be assigned to the consumer yet. In such cases you can first complete the rebalance | ||
* by calling {@link #poll(Duration)} and retry committing offsets again. NOTE when you retry after the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should not say that the commit can be retried. I would just say that the rebalance needs to be completed by calling poll()
and that offsets to commit can be reconsidered after the group is rejoined.
log.debug("Committing offsets: {}", offsets); | ||
offsets.forEach(this::updateLastSeenEpochIfNewer); | ||
coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback); | ||
} catch (CommitFailedException e) { | ||
log.error("Failed to commit offsets asynchronously because they do not belong to dynamically assigned partitions"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need this catch
since we reverted the logic to verify only assigned partitions can be committed?
public RetriableCommitFailedException(Throwable t) { | ||
super("Offset commit failed with a retriable exception. You should retry committing " + | ||
"the latest consumed offsets.", t); | ||
} | ||
|
||
public RetriableCommitFailedException(String message) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. Are we safe to remove these given that this is a public API. It's probably unlikely anyone is using them, but still..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I was not sure we allow public classes to be used in our API "contract" :P Anyhow, I don't feel strong about it and I can revert.
@@ -130,11 +130,12 @@ | |||
private MemberState state = MemberState.UNJOINED; | |||
private HeartbeatThread heartbeatThread = null; | |||
private RequestFuture<ByteBuffer> joinFuture = null; | |||
private RequestFuture<Void> findCoordinatorFuture = null; | |||
private RuntimeException findCoordinatorException = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably need to either make this volatile
or an AtomicReference
.
final RequestFuture<Void> future = lookupCoordinator(); | ||
client.poll(future, timer); | ||
|
||
if (!future.isDone()) { | ||
// ran out of time | ||
future.addListener(new RequestFutureListener<Void>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regardless of where the lookupCoordinator
is triggered, we are only raising it from ensureCoordinatorReady
, so I am not sure I follow the point about raising from other contexts.
Note there doesn't appear to be any logic preventing multiple listeners from getting attached to the future. I think it would be better to always attach the listener when the future is created.
entry.getValue().leaderEpoch().ifPresent(epoch -> this.metadata.updateLastSeenEpochIfNewer(entry.getKey(), epoch)); | ||
this.subscriptions.seekUnvalidated(tp, position); | ||
|
||
// it's possible that the partition is no longer assigned when the response is received, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it's worth adding a debug message saying that we're ignoring the fetched offset?
@@ -19,19 +19,7 @@ | |||
public class RebalanceInProgressException extends ApiException { | |||
private static final long serialVersionUID = 1L; | |||
|
|||
public RebalanceInProgressException() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. I think we should probably leave these constructors around. They are not really doing any harm.
@@ -22,8 +22,22 @@ | |||
|
|||
private static final long serialVersionUID = 1L; | |||
|
|||
public static RetriableCommitFailedException withUnderlyingMessage(String additionalMessage) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm ok removing this API in spite of the compatibility concern. It's just that the other constructors are "standard" exception constructors and we have no real need to remove them.
@@ -274,8 +264,20 @@ public void onFailure(RuntimeException e) { | |||
if (node == null) { | |||
log.debug("No broker available to send FindCoordinator request"); | |||
return RequestFuture.noBrokersAvailable(); | |||
} else | |||
} else{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this probably breaks checkstyle
@@ -769,6 +769,9 @@ public boolean refreshCommittedOffsetsIfNeeded(Timer timer) { | |||
this.subscriptions.seekUnvalidated(tp, position); | |||
|
|||
log.info("Setting offset for partition {} to the committed offset {}", tp, position); | |||
} else { | |||
log.info("Ignoring the returned {} since its partition {} is no longer assigned", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'd suggest "Ignoring the fetched committed offset"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I originally did that, but then I realize OffsetAndMetadata#toString
contains this already so I decided to avoid duplicated wording.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guozhangwang LGTM, thanks for the carrying this through. Note there is a failure in testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
which is probably worth checking out before we merge.
Re-run the failed tests locally and it seems stable (30+ runs), will create the flaky test JIRA and merge. |
After #7312, we could still return data during the rebalance phase, which means it could be possible to find records without corresponding tasks. We have to fallback to the unsubscribe mode during task migrated as the assignment should be cleared out to keep sync with task manager state. Reviewers: A. Sophie Blee-Goldman <[email protected]>, Guozhang Wang <[email protected]>
Not wait until
updateAssignmentMetadataIfNeeded
returns true, but only call it once with 0 timeout. Also do not return empty if in rebalance.Trim the pre-fetched records after long polling since assignment may have been changed.
Also need to update SubscriptionState to retain the state in
assignFromSubscribed
if it already exists (similar toassignFromUser
), so that we do not need the transition of INITIALIZING to FETCHING.Unit test: this actually took me the most time :)
Committer Checklist (excluded from commit message)