From dcffbcaf3122489a637dc66f8bc64f6312287d15 Mon Sep 17 00:00:00 2001 From: springuper Date: Wed, 11 May 2016 20:19:26 +0800 Subject: [PATCH] bugfix: make rebalance the right way --- lib/highLevelConsumer.js | 47 ++++++++++++++++------------------------ lib/zookeeper.js | 10 ++++----- 2 files changed, 24 insertions(+), 33 deletions(-) diff --git a/lib/highLevelConsumer.js b/lib/highLevelConsumer.js index a1bd9cf9..fc5cf5ef 100644 --- a/lib/highLevelConsumer.js +++ b/lib/highLevelConsumer.js @@ -48,6 +48,7 @@ var HighLevelConsumer = function (client, topics, options) { this.ready = false; this.paused = this.options.paused; this.rebalancing = false; + this.pending = false; this.id = this.options.id || this.options.groupId + '_' + uuid.v4(); this.payloads = this.buildPayloads(topics); this.topicPayloads = this.buildTopicPayloads(topics); @@ -142,8 +143,6 @@ HighLevelConsumer.prototype.connect = function () { function rebalance() { if (!self.rebalancing) { - deregister(); - self.emit('rebalancing'); self.rebalancing = true; @@ -171,45 +170,38 @@ HighLevelConsumer.prototype.connect = function () { return p.topic; }); self.client.refreshMetadata(topicNames, function (err) { - register(); self.rebalancing = false; if (err) { - self.emit('error', err); + self.emit('error', err); } else { - self.emit('rebalanced'); + self.emit('rebalanced'); + } + if (self.pending) { + self.pending = false; + rebalance(); } }); } }); }); + } else { + self.pending = true; } } - // Wait for the consumer to be ready - this.on('registered', function () { - rebalance(); - }); - function register() { debug("Registered listeners"); // Register for re-balances (broker or consumer changes) self.client.zk.on('consumersChanged', rebalance); self.client.on('brokersChanged', rebalance); } - - function deregister() { - debug("Deregistered listeners"); - // Register for re-balances (broker or consumer changes) - self.client.zk.removeListener('consumersChanged', rebalance); - self.client.removeListener('brokersChanged', rebalance); - } + register(); function attachZookeeperErrorListener() { self.client.zk.on('error', function (err) { - self.emit('error', err); - }); + self.emit('error', err); + }); } - attachZookeeperErrorListener(); this.client.on('zkReconnect', function () { @@ -248,7 +240,6 @@ HighLevelConsumer.prototype.connect = function () { }); }); - // 'done' will be emit when a message fetch request complete this.on('done', function (topics) { self.updateOffsets(topics); @@ -436,7 +427,6 @@ HighLevelConsumer.prototype.rebalanceAttempt = function (oldTopicPayloads, cb) { }; HighLevelConsumer.prototype.init = function () { - var self = this; if (!self.topicPayloads.length) { @@ -445,11 +435,10 @@ HighLevelConsumer.prototype.init = function () { self.registerConsumer(function (err) { if (err) { - return self.emit('error', new errors.FailedToRegisterConsumerError(err.toString())); + self.emit('error', new errors.FailedToRegisterConsumerError(err.toString())); + return; } - - // Close the - return self.emit('registered'); + self.emit('registered'); }); }; @@ -523,11 +512,13 @@ HighLevelConsumer.prototype.offsetRequest = function (payloads, cb) { * @param {Client~failedToRegisterConsumerCallback} cb A function to call the consumer has been registered */ HighLevelConsumer.prototype.registerConsumer = function (cb) { - this.client.zk.registerConsumer(this.options.groupId, this.id, this.payloads, function (err) { + var zk = this.client.zk; + var groupId = this.options.groupId; + this.client.zk.registerConsumer(groupId, this.id, this.payloads, function (err) { if (err) return cb(err); + zk.listConsumers(groupId, true); cb(); }) - this.client.zk.listConsumers(this.options.groupId); }; HighLevelConsumer.prototype.addTopics = function (topics, cb) { diff --git a/lib/zookeeper.js b/lib/zookeeper.js index b05554b9..8ac0fa93 100644 --- a/lib/zookeeper.js +++ b/lib/zookeeper.js @@ -248,13 +248,13 @@ Zookeeper.prototype.listBrokers = function (cb) { Zookeeper.prototype.listConsumers = function (groupId) { - var that = this; + var self = this; var path = '/consumers/' + groupId + '/ids'; this.client.getChildren( path, function () { - if (!that.closed) { - that.listConsumers(groupId); + if (!self.closed) { + self.listConsumers(groupId); } }, function (error, children) { @@ -262,10 +262,10 @@ Zookeeper.prototype.listConsumers = function (groupId) { debug(error); // Ignore NO_NODE error here #157 if (error.name !== 'NO_NODE') { - that.emit('error', error); + self.emit('error', error); } } else { - that.emit('consumersChanged'); + self.emit('consumersChanged'); } } );