-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-7026: Sticky Assignor Partition Assignment Improvement (KIP-341) #5291
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello Vahid,
I have some reservations on your solution. As I fear it will not work if the leader changes. (or even worse when an previous leader becomes leader again).
Maybe I am wrong, but I think a different kind of solution is needed. If you want I can take a stab at it.
} | ||
} | ||
|
||
currentAssignment.put(member, deserializedAssignment); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This caching logic might work if the leader does not change.
But I have the impression that it is possible to end up in the same problem when the leader changes. Because as far as I see, the new leader will not have build up its cache. (or even worse has an outdated cache from a previous time it was leader)
Which I think can still cause troubles.
I see two other possible solutions:
-
The easiest one to implement I think is keep the previous code, but change the type of the
currentAssignment
parameter from aMap<String, List<TopicPartition>>
to aMap<TopicPartition, String>
, or keep the type but do a post-processing step on this list where double TopicPartitions are removed before doing thesortPartitions
. -
Another possibility is putting a generation counter in the schema, and only keep assignments with the maximum generation counter. (or those where it is still missing for backwards compatibility).
The first possible solutions is easy to implement but makes it a little less sticky. (which I do not mind)
The second solution is more sound, but implementing it in a backwards compatible way is a challenge.
@steven-aerts thanks for the feedback. The provided solution, based on some tests I ran, should resolve the bug in the use case described in the JIRA (a leader pausing for a bit, another consumer becoming a leader, and then the old leader rejoining). Can you describe more concretely a scenario that still exposes a bug in the assignor? |
my main problem with this solution is that you keep some state on the leader. So the other consumers do not have any state which means that if any of them becomes leader, I think theoretically it is again possible to see this issue. To give you a concrete scenario.schematic representation of the bugLet's start with a schematic representation of the bug.
G2: c3 is unreachable and leaves according to c1/the coordinator the group:
G3: c3 comes back with subscription from G1 combined with the G1 subscriptions of G2 for c1 and c2 this leads to a wrong assignment:
Now a case where your solution works:G1: c3 joins group and gets assignment:
G2: c3 is unreachable and leaves according to c1/the coordinator the group:
G3: c3 comes back with subscription from G1 but cache drops its subscription data:
Where I think it breaksBut if in the meanwhile another consumer becomes leader you have a problem, as G3 will then look like this as the cache will not prevent the double assignment:
So my main problem is at the end, you are still able assign a double partition, as no doubles are filtered anywhere. |
@steven-aerts Thanks for describing those scenarios in detail. Though, regarding the last one (where you think it breaks) I'm not sure about how the group can end up in that state. The scenario you described would imply that at some point I tried to rebuild the scenario with that in mind, but couldn't reproduce the issue (see below). G1:
G2:
G3:
G4:
|
@vahidhashemian Then G3 should look like this where partition 6 still has a double assignment:
For this to work you still two operations in one generation. Reproducing it by hand by pausing debuggers might be tricky. What I would do to try to reproduce the scenario above is pause the coordinator (use a kafka cluser of 1), unpause Good luck, Steven |
@steven-aerts I understand. There are corner cases where the issue can resurface, even with this PR. I'll try to think about ways to improve this solution. Between the two options you suggested I like the second one best (using generations), as the first one could jeopardize stickiness (we should avoid randomly removing duplicates). I'll think about this some more. Thanks again for pointing out this issue. |
I think there is a variant of option 1 which also ensures stickiness in (almost?) all cases and will be easier to implement then option two. I think if you detect duplicates, you can loop over all subscriptions, find the subscription with the most duplicates, and remove from that subscription all duplicate partitions assignments, and test again. What I propose is something like this: for( ; ; ) {
Set<TopicPartition> doubleAssigned = currentAssignment.values().stream().flatMap(List::stream)
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()))
.entrySet().stream().filter(e -> e.getValue() > 1).map(Map.Entry::getKey)
.collect(Collectors.toSet());
if (doubleAssigned.isEmpty()) {
break;
}
String toClean = currentAssignment.entrySet().stream().max(
Comparator.comparing(e -> e.getValue().stream().filter(doubleAssigned::contains).count())
.thenComparing(e -> e.getValue().size(), Comparator.reverseOrder()))
.get().getKey();
currentAssignment.get(toClean).removeIf(doubleAssigned::contains);
} This is just an example. I hope this helps. |
@steven-aerts If I understood your suggestion correctly, even though it will preserve stickiness in most situations, but there are still examples that it fails to do so. For example, let's assume a group of 5 consumers: I think with generation markers we can preserve stickiness in the safest possible way: any consumer with a stale generation marker loses its assignment in case of a duplicate. I think the code doesn't have to change much to support that. |
@vahidhashemian Any solution is good for me, as long as it excludes a duplicate assignment and preserves stickiness in 95% of the cases. So if you see a way to introduce generation counter also OK for me. Thanks |
a949367
to
8714d0e
Compare
cc @hachikuji, please review! Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
The only place where I am doubting is the prevPartitions
initialization.
Thanks
// note that a conflict could exists only if user data is for different generations | ||
|
||
Map<TopicPartition, ConsumerGenerationPair> partitionConsumer = new HashMap<>(); | ||
int maxGeneration = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Detail: just to be sure, I would initialize it to this.generation
, to make sure the generation always increments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thinking was that this generation marker is okay to reset to 0 in case at some point in time none of the consumers in the group carry user data, which would have the same effect even if we start with this.generation
here (if no consumer has user data, this.generation = 0
). I made the change as you suggested.
if (consumerUserData.generation == existingRecord.generation) { | ||
// same partition is assigned to two consumers during the same rebalance | ||
throw new IllegalStateException("Error: Partition '" + partition + "' is assigned to multiple consumers " + | ||
"following sticky assignment generation " + consumerUserData.generation + "."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small remark: By throwing here an exception you are giving up.
But I am wondering if you should not try to fix it.
I do not know which consistency guarantees a kafka consumer group exactly gives. But if it is possible to get into a split brain situation (network split), where temporarily a consumer group is split in two and gets two leaders. You get in a situation where the SitckyAssignor
will never recover.
If it was me, I would write a big fat error, and drop one of the two assignments.
The same for line#322.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense. I updated this to make sure it keeps running.
@@ -421,10 +515,17 @@ private int getBalanceScore(Map<String, List<TopicPartition>> assignment) { | |||
TreeSet<String> sortedConsumers = new TreeSet<>(new SubscriptionComparator(assignments)); | |||
sortedConsumers.addAll(assignments.keySet()); | |||
|
|||
List<TopicPartition> prevPartitions = new ArrayList(prevAssignment.keySet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I am wrong, but shouldn't this be moved inside the while
loop, so it is initialized every time a new assignment is selected?
Because, now I think only the first consumer will be challenged against this previous partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. The whole list needs to be compared against each consumer's assignment. Updated.
8714d0e
to
86fcf67
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@steven-aerts thanks for taking a look and raising those issues. I tried to address them in the new commit.
// note that a conflict could exists only if user data is for different generations | ||
|
||
Map<TopicPartition, ConsumerGenerationPair> partitionConsumer = new HashMap<>(); | ||
int maxGeneration = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thinking was that this generation marker is okay to reset to 0 in case at some point in time none of the consumers in the group carry user data, which would have the same effect even if we start with this.generation
here (if no consumer has user data, this.generation = 0
). I made the change as you suggested.
if (consumerUserData.generation == existingRecord.generation) { | ||
// same partition is assigned to two consumers during the same rebalance | ||
throw new IllegalStateException("Error: Partition '" + partition + "' is assigned to multiple consumers " + | ||
"following sticky assignment generation " + consumerUserData.generation + "."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense. I updated this to make sure it keeps running.
@@ -421,10 +515,17 @@ private int getBalanceScore(Map<String, List<TopicPartition>> assignment) { | |||
TreeSet<String> sortedConsumers = new TreeSet<>(new SubscriptionComparator(assignments)); | |||
sortedConsumers.addAll(assignments.keySet()); | |||
|
|||
List<TopicPartition> prevPartitions = new ArrayList(prevAssignment.keySet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. The whole list needs to be compared against each consumer's assignment. Updated.
else if (consumerGeneration.generation == existingPair.generation) | ||
// same partition is assigned to two consumers during the same rebalance | ||
throw new IllegalStateException("Error: Partition '" + partition + "' had been assigned to " + | ||
"multiple consumers following sticky assignment generation " + consumerGeneration.generation + "."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wondering, if this exception stays (and did not became an error log) because it is impossible to hit and should have been prevented by the previous error log?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. I missed replacing this throw
statement. It's done in the new patch. Thanks!
86fcf67
to
23b0ff4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 cannot wait to see this merged.
I also like the fact that this is pure java 7 so it can be backported to the bugfix releases.
clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
Show resolved
Hide resolved
7740773
to
14e4d82
Compare
Map<String, Subscription> subscriptions, | ||
int generation) { | ||
return assign(partitionsPerTopic, subscriptions); | ||
} @Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a formatting error at this line.
@@ -421,10 +518,18 @@ private int getBalanceScore(Map<String, List<TopicPartition>> assignment) { | |||
TreeSet<String> sortedConsumers = new TreeSet<>(new SubscriptionComparator(assignments)); | |||
sortedConsumers.addAll(assignments.keySet()); | |||
|
|||
List<TopicPartition> prevAssignedPartitions = new ArrayList(prevAssignment.keySet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This ArrayList
should be generic (that is, new ArrayList<>(...)
).
14e4d82
to
894e73f
Compare
I may be missing something, but it seems to me like onAssignment() ought to be extracting the generation from the userData and updating the field value. Otherwise, won't all the non-leader members of the consumer group keep always sending generation=0 with their current assignments? |
@kscaldef You are correct. Thanks for catching this issue. I'll try to fix it in the next commit. |
2ed9a9c
to
4cd3bd4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, just one final api question. Once it is decided, we should update the KIP.
@@ -791,6 +792,11 @@ public Subscription subscription(final Set<String> topics) { | |||
return taskIdsForConsumerAssignment; | |||
} | |||
|
|||
@Override | |||
public void onAssignment(final Assignment assignment, final Optional<Integer> generation) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be able to get rid of this since we have the default method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True. Removed in the recent patch.
} | ||
} | ||
|
||
@Override | ||
public void onAssignment(Assignment assignment) { | ||
public void onAssignment(Assignment assignment, Optional<Integer> generation) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying to think through whether generation needs to be an optional. The call inside ConsumerCoordinator
always passes a valid generation. The only case I can imagine is if an updated assignor gets mixed with an old client. I'm not sure that it's worth the api noise to try and handle that case. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The impact in the case you mentioned will be minimal at best. I'd also like removing the optional. I've updated the code accordingly.
Is this fairly close to being ready to land? We'll be wanting to use the StickyAssignor when we deploy our latest project to production, but I'm uncomfortable leaving this window open for duplicate partition assignments. Can I help rebase/test? |
This is a work in progress on my part. Delayed due to vacation. Will try to resume within a week or so. |
@vahidhashemian Do you need any help finishing this up? Would be nice to get it over the line. |
@hachikuji Sorry for the delay. I'll try to spend time on this over the weekend. |
In the current implementation of sticky assignor the leader does not cache the most recent calculated assignment. It relies on the fact that each consumer in the group sends its subscribed topics and also its current assignment when a rebalance occurs. This could lead to the issue described in KAFKA-7026, in which current assignment of a consumer is no loonger valid and should be ignored. The solution implemented in this PR involves the leader caching the most recent assignment of each consumer, so the assignment reported by a consumer can be properly ignored if necessary.
… assignment by the leader This helps in situations where a new leader is chosen and there are dupliacates in the reported assignments by consumers.
(instead of the locally managed generation)
…ility Also, minor fixes and clarifying comments
623e7cb
to
d13a973
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji I submitted another patch to address your latest comments. Let me know what you think. Thanks!
} | ||
} | ||
|
||
@Override | ||
public void onAssignment(Assignment assignment) { | ||
public void onAssignment(Assignment assignment, Optional<Integer> generation) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The impact in the case you mentioned will be minimal at best. I'd also like removing the optional. I've updated the code accordingly.
@@ -791,6 +792,11 @@ public Subscription subscription(final Set<String> topics) { | |||
return taskIdsForConsumerAssignment; | |||
} | |||
|
|||
@Override | |||
public void onAssignment(final Assignment assignment, final Optional<Integer> generation) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True. Removed in the recent patch.
ca03449
to
a03a734
Compare
@@ -25,6 +25,7 @@ | |||
import org.apache.kafka.common.errors.GroupMaxSizeReachedException; | |||
import org.apache.kafka.common.errors.IllegalGenerationException; | |||
import org.apache.kafka.common.errors.InterruptException; | |||
import org.apache.kafka.common.errors.MemberIdRequiredException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was missed in the rebase.
@@ -652,7 +652,7 @@ public void testNoExceptionThrownWhenOnlySubscribedTopicDeleted() { | |||
Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); | |||
subscriptions.put(consumer, | |||
new Subscription(topics(topic), StickyAssignor.serializeTopicPartitionAssignment( | |||
new ConsumerUserData(assignment.get(consumer), 1)))); | |||
new ConsumerUserData(assignment.get(consumer), Optional.of(1))))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was missed in the rebase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vahidhashemian LGTM. Thanks for the iterations. Would you mind sending an update to the mail list to mention the differences from the original KIP? We can wait a day or two to make sure there are no objections before merging.
apache#5291) This patch contains the implementation of KIP-341, which adds protection in the sticky assignor from consumers which are joining with a stale assignment. More details can be found in the proposal: https://cwiki.apache.org/confluence/display/KAFKA/KIP-341%3A+Update+Sticky+Assignor%27s+User+Data+Protocol. Reviewers: Steven Aerts <[email protected]>, Jason Gustafson <[email protected]>
In the current implementation of sticky assignor the leader does not cache the most recent calculated assignment. It relies on the fact that each consumer in the group sends its subscribed topics and also its current assignment when a rebalance occurs. This could lead to the issue described in KAFKA-7026, in which current assignment of a consumer is no longer valid and should be ignored. The solution implemented in this PR involves the leader caching the most recent assignment of each consumer, so the assignment reported by a consumer can be properly validated (and ignored if necessary) by the leader during a rebalance.
Committer Checklist (excluded from commit message)