-
Notifications
You must be signed in to change notification settings - Fork 628
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
bugfix: make rebalance the right way #369
Conversation
var path = '/consumers/' + groupId + '/ids'; | ||
this.client.getChildren( | ||
path, | ||
function () { | ||
if (!that.closed) { | ||
that.listConsumers(groupId); | ||
if (!self.closed && watch) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
avoid unnecessary nesting watchers, referring to Zookeeper.prototype.topicExists
cbceff3
to
f46fd15
Compare
} | ||
} | ||
|
||
// Wait for the consumer to be ready | ||
this.on('registered', function () { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consumersChanged
will trigger whenever init
method completes, so there is no necessary to listen to registered
event.
f46fd15
to
dcffbca
Compare
👍 .. I'm waiting for this PR to be merged so we can have multiple instances of the consumer with the same group ID work properly. |
@saurako, the maintainers are too slow to review this PR. So, I have published a forked module 'kafka-node2', https://www.npmjs.com/package/kafka-node2, you can try it now. |
@springuper awesome! I'll definitely try it out. Will keep you posted on how that goes. Thanks! |
I will try to review this soon. Sorry for the wait! |
@springuper thanks for the PR. |
@springuper This PR is awesome! Works like a charm and Save my day. |
this.client.zk.registerConsumer(this.options.groupId, this.id, this.payloads, function (err) { | ||
var zk = this.client.zk; | ||
var groupId = this.options.groupId; | ||
this.client.zk.registerConsumer(groupId, this.id, this.payloads, function (err) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could have used zk
here or this.client.zk
again in the next statement for consistency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I just noticed how zk
is probably a bad var name choice as it conflicts with previous global var zk = require('./zookeeper')
assignment statement and might be just confusing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't look like zk
is actually being used outside the context of the client
so we could delete line 12 and 13
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I have updated this PR according to your useful advices.
@springuper can you rebase your branch with upstream/master and add some tests? 👍 |
6486c9c
to
045b46e
Compare
@hyperlink rebase done, and fix some conflicts. I found no tests currently about HighLevelConsumer, and there are some challenges about how to stub zk and kafka. I will try some time later. |
@springuper feel free to add |
Hi, any update? |
@pcothenet @springuper will need to rebase with upstream master since I've changed the coding style. And ideally I would also like to see some tests to accompany this PR too. |
still not fixed in the latest version(0.5.1) |
@lezi1022 I'm planning on incorporating rebalance fix in 0.5.3. |
fix #90.
After debugging in deep, I find out that in the following case
FailedToRebalanceConsumerError
definitely occurs:rebalance
rebalance
after a very short timerebalance
is done, and obtains all partitions, let's assume partition 1 and partition 2rebalance
and finds that partition 2 should be it's own, but it's already obtained by consumer A, thenFailedToRebalanceConsumerError
occursTo solve this problem, I change
rebalance
with pending status and always listen toconsumersChanged
event. The purpose is to makerebalance
once again when there are other consumers registered to ZK in the same timerebalance
is continuing.