Skip to content

Commit

Permalink
bugfix: make rebalance the right way
Browse files Browse the repository at this point in the history
  • Loading branch information
springuper committed May 12, 2016
1 parent 3813c7d commit dcffbca
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 33 deletions.
47 changes: 19 additions & 28 deletions lib/highLevelConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var HighLevelConsumer = function (client, topics, options) {
this.ready = false;
this.paused = this.options.paused;
this.rebalancing = false;
this.pending = false;
this.id = this.options.id || this.options.groupId + '_' + uuid.v4();
this.payloads = this.buildPayloads(topics);
this.topicPayloads = this.buildTopicPayloads(topics);
Expand Down Expand Up @@ -142,8 +143,6 @@ HighLevelConsumer.prototype.connect = function () {
function rebalance() {

if (!self.rebalancing) {
deregister();

self.emit('rebalancing');

self.rebalancing = true;
Expand Down Expand Up @@ -171,45 +170,38 @@ HighLevelConsumer.prototype.connect = function () {
return p.topic;
});
self.client.refreshMetadata(topicNames, function (err) {
register();
self.rebalancing = false;
if (err) {
self.emit('error', err);
self.emit('error', err);
} else {
self.emit('rebalanced');
self.emit('rebalanced');
}
if (self.pending) {
self.pending = false;
rebalance();
}
});
}
});
});
} else {
self.pending = true;
}
}

// Wait for the consumer to be ready
this.on('registered', function () {
rebalance();
});

function register() {
debug("Registered listeners");
// Register for re-balances (broker or consumer changes)
self.client.zk.on('consumersChanged', rebalance);
self.client.on('brokersChanged', rebalance);
}

function deregister() {
debug("Deregistered listeners");
// Register for re-balances (broker or consumer changes)
self.client.zk.removeListener('consumersChanged', rebalance);
self.client.removeListener('brokersChanged', rebalance);
}
register();

function attachZookeeperErrorListener() {
self.client.zk.on('error', function (err) {
self.emit('error', err);
});
self.emit('error', err);
});
}

attachZookeeperErrorListener();

this.client.on('zkReconnect', function () {
Expand Down Expand Up @@ -248,7 +240,6 @@ HighLevelConsumer.prototype.connect = function () {
});
});


// 'done' will be emit when a message fetch request complete
this.on('done', function (topics) {
self.updateOffsets(topics);
Expand Down Expand Up @@ -436,7 +427,6 @@ HighLevelConsumer.prototype.rebalanceAttempt = function (oldTopicPayloads, cb) {
};

HighLevelConsumer.prototype.init = function () {

var self = this;

if (!self.topicPayloads.length) {
Expand All @@ -445,11 +435,10 @@ HighLevelConsumer.prototype.init = function () {

self.registerConsumer(function (err) {
if (err) {
return self.emit('error', new errors.FailedToRegisterConsumerError(err.toString()));
self.emit('error', new errors.FailedToRegisterConsumerError(err.toString()));
return;
}

// Close the
return self.emit('registered');
self.emit('registered');
});
};

Expand Down Expand Up @@ -523,11 +512,13 @@ HighLevelConsumer.prototype.offsetRequest = function (payloads, cb) {
* @param {Client~failedToRegisterConsumerCallback} cb A function to call the consumer has been registered
*/
HighLevelConsumer.prototype.registerConsumer = function (cb) {
this.client.zk.registerConsumer(this.options.groupId, this.id, this.payloads, function (err) {
var zk = this.client.zk;
var groupId = this.options.groupId;
this.client.zk.registerConsumer(groupId, this.id, this.payloads, function (err) {
if (err) return cb(err);
zk.listConsumers(groupId, true);
cb();
})
this.client.zk.listConsumers(this.options.groupId);
};

HighLevelConsumer.prototype.addTopics = function (topics, cb) {
Expand Down
10 changes: 5 additions & 5 deletions lib/zookeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -248,24 +248,24 @@ Zookeeper.prototype.listBrokers = function (cb) {


Zookeeper.prototype.listConsumers = function (groupId) {
var that = this;
var self = this;
var path = '/consumers/' + groupId + '/ids';
this.client.getChildren(
path,
function () {
if (!that.closed) {
that.listConsumers(groupId);
if (!self.closed) {
self.listConsumers(groupId);
}
},
function (error, children) {
if (error) {
debug(error);
// Ignore NO_NODE error here #157
if (error.name !== 'NO_NODE') {
that.emit('error', error);
self.emit('error', error);
}
} else {
that.emit('consumersChanged');
self.emit('consumersChanged');
}
}
);
Expand Down

0 comments on commit dcffbca

Please sign in to comment.