Skip to content

Commit

Permalink
make rebalance the right way
Browse files Browse the repository at this point in the history
  • Loading branch information
springuper committed Jun 24, 2016
1 parent 009a6a1 commit 045b46e
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 36 deletions.
57 changes: 26 additions & 31 deletions lib/highLevelConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ var util = require('util'),
Offset = require('./offset'),
async = require("async"),
errors = require('./errors'),
zk = require('./zookeeper'),
ZookeeperConsumerMappings = zk.ZookeeperConsumerMappings,
retry = require('retry'),
debug = require('debug')('kafka-node:HighLevelConsumer');

Expand All @@ -22,6 +20,7 @@ var DEFAULTS = {
autoCommitIntervalMs: 5000,
// Fetch message config
fetchMaxWaitMs: 100,
rebalanceDelayMs: 0,
paused: false,
maxNumSegments: 1000,
fetchMinBytes: 1,
Expand All @@ -48,6 +47,7 @@ var HighLevelConsumer = function (client, topics, options) {
this.ready = false;
this.paused = this.options.paused;
this.rebalancing = false;
this.pending = false;
this.id = this.options.id || this.options.groupId + '_' + uuid.v4();
this.payloads = this.buildPayloads(topics);
this.topicPayloads = this.buildTopicPayloads(topics);
Expand Down Expand Up @@ -143,8 +143,6 @@ HighLevelConsumer.prototype.connect = function () {
function rebalance() {

if (!self.rebalancing) {
deregister();

self.emit('rebalancing');

self.rebalancing = true;
Expand Down Expand Up @@ -172,45 +170,43 @@ HighLevelConsumer.prototype.connect = function () {
return p.topic;
});
self.client.refreshMetadata(topicNames, function (err) {
register();
self.rebalancing = false;
if (err) {
self.emit('error', err);
self.emit('error', err);
} else {
self.emit('rebalanced');
self.emit('rebalanced');
}
if (self.pending) {
self.pending = false;
rebalance();
}
});
}
});
});
} else {
self.pending = true;
}
}

// Wait for the consumer to be ready
this.on('registered', function () {
rebalance();
});

function register() {
function registerListeners() {
debug("Registered listeners");
// Register for re-balances (broker or consumer changes)
self.client.zk.on('consumersChanged', rebalance);
self.client.on('brokersChanged', rebalance);
}

function deregister() {
debug("Deregistered listeners");
// Register for re-balances (broker or consumer changes)
self.client.zk.removeListener('consumersChanged', rebalance);
self.client.removeListener('brokersChanged', rebalance);
}
self.on('registered', function () {
setTimeout(function () {
registerListeners();
rebalance();
}, self.options.rebalanceDelayMs);
});

function attachZookeeperErrorListener() {
self.client.zk.on('error', function (err) {
self.emit('error', err);
});
self.emit('error', err);
});
}

attachZookeeperErrorListener();

this.client.on('zkReconnect', function () {
Expand Down Expand Up @@ -249,7 +245,6 @@ HighLevelConsumer.prototype.connect = function () {
});
});


// 'done' will be emit when a message fetch request complete
this.on('done', function (topics) {
self.updateOffsets(topics);
Expand Down Expand Up @@ -443,7 +438,6 @@ HighLevelConsumer.prototype.rebalanceAttempt = function (oldTopicPayloads, cb) {
};

HighLevelConsumer.prototype.init = function () {

var self = this;

if (!self.topicPayloads.length) {
Expand All @@ -452,11 +446,10 @@ HighLevelConsumer.prototype.init = function () {

self.registerConsumer(function (err) {
if (err) {
return self.emit('error', new errors.FailedToRegisterConsumerError(err.toString()));
self.emit('error', new errors.FailedToRegisterConsumerError(err.toString()));
return;
}

// Close the
return self.emit('registered');
self.emit('registered');
});
};

Expand Down Expand Up @@ -530,11 +523,13 @@ HighLevelConsumer.prototype.offsetRequest = function (payloads, cb) {
* @param {Client~failedToRegisterConsumerCallback} cb A function to call the consumer has been registered
*/
HighLevelConsumer.prototype.registerConsumer = function (cb) {
this.client.zk.registerConsumer(this.options.groupId, this.id, this.payloads, function (err) {
var zk = this.client.zk;
var groupId = this.options.groupId;
zk.registerConsumer(groupId, this.id, this.payloads, function (err) {
if (err) return cb(err);
zk.listConsumers(groupId);
cb();
});
this.client.zk.listConsumers(this.options.groupId);
};

HighLevelConsumer.prototype.addTopics = function (topics, cb) {
Expand Down
10 changes: 5 additions & 5 deletions lib/zookeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -248,24 +248,24 @@ Zookeeper.prototype.listBrokers = function (cb) {


Zookeeper.prototype.listConsumers = function (groupId) {
var that = this;
var self = this;
var path = '/consumers/' + groupId + '/ids';
this.client.getChildren(
path,
function () {
if (!that.closed) {
that.listConsumers(groupId);
if (!self.closed) {
self.listConsumers(groupId);
}
},
function (error, children) {
if (error) {
debug(error);
// Ignore NO_NODE error here #157
if (error.name !== 'NO_NODE') {
that.emit('error', error);
self.emit('error', error);
}
} else {
that.emit('consumersChanged');
self.emit('consumersChanged');
}
}
);
Expand Down

0 comments on commit 045b46e

Please sign in to comment.