Skip to content

Commit

Permalink
Fix HLC rebalance issues and add tests (#423)
Browse files Browse the repository at this point in the history
* Add rebalance test

* Add test involving fourth child

* move children rearer out

* Add more tests, updated rearer to kill randomly

* Add another test for the case of rebalancing of unassigned HLCs

* Fix HLC issue where rebalance does not happen if the consumer has not been assigned any partitions
Fixed issue where a rebalance can happen during closing

* Calling close on HLC will release owned partitions and unregister itself from the consumer group

* Fix CONNECTION_LOSS error when zk is closed before leaving group

* Improve stableness of rebalance tests

* Add zk test for unregistering a consumer from the group
  • Loading branch information
hyperlink authored Aug 5, 2016
1 parent d66c360 commit 437cdfd
Show file tree
Hide file tree
Showing 9 changed files with 1,034 additions and 498 deletions.
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ services:
KAFKA_SSL_KEY_PASSWORD: "password"
KAFKA_SSL_TRUSTSTORE_LOCATION: "/var/private/ssl/certs/server.truststore.jks"
KAFKA_SSL_TRUSTSTORE_PASSWORD: "password"
KAFKA_CREATE_TOPICS: "RebalanceTopic:3:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- ./docker/certs:/var/private/ssl/certs
141 changes: 86 additions & 55 deletions lib/highLevelConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var HighLevelConsumer = function (client, topics, options) {
this.options = _.defaults((options || {}), DEFAULTS);
this.initialised = false;
this.ready = false;
this.closing = false;
this.paused = this.options.paused;
this.rebalancing = false;
this.id = this.options.id || this.options.groupId + '_' + uuid.v4();
Expand Down Expand Up @@ -133,7 +134,8 @@ HighLevelConsumer.prototype.connect = function () {
}

function rebalance () {
if (!self.rebalancing) {
debug('rebalance() %s is rebalancing: %s ready: %s', self.id, self.rebalancing, self.ready);
if (!self.rebalancing && !self.closing) {
deregister();

self.emit('rebalancing');
Expand Down Expand Up @@ -166,7 +168,10 @@ HighLevelConsumer.prototype.connect = function () {
if (err) {
self.rebalancing = false;
self.emit('error', err);
} else {
return;
}

if (self.topicPayloads.length) {
fetchAndUpdateOffsets(function (err) {
self.rebalancing = false;
if (err) {
Expand All @@ -176,6 +181,9 @@ HighLevelConsumer.prototype.connect = function () {
self.fetch();
self.emit('rebalanced');
});
} else { // was not assigned any partitions during rebalance
self.rebalancing = false;
self.emit('rebalanced');
}
});
}
Expand All @@ -190,14 +198,14 @@ HighLevelConsumer.prototype.connect = function () {
});

function register () {
debug('Registered listeners');
debug('Registered listeners %s', self.id);
// Register for re-balances (broker or consumer changes)
self.client.zk.on('consumersChanged', rebalance);
self.client.on('brokersChanged', rebalance);
}

function deregister () {
debug('Deregistered listeners');
debug('Deregistered listeners %s', self.id);
// Register for re-balances (broker or consumer changes)
self.client.zk.removeListener('consumersChanged', rebalance);
self.client.removeListener('brokersChanged', rebalance);
Expand Down Expand Up @@ -259,6 +267,38 @@ HighLevelConsumer.prototype.connect = function () {
});
};

HighLevelConsumer.prototype._releasePartitions = function (topicPayloads, callback) {
var self = this;
async.each(topicPayloads, function (tp, cbb) {
if (tp.partition !== undefined) {
async.series([
function (delcbb) {
self.client.zk.checkPartitionOwnership(self.id, self.options.groupId, tp.topic, tp.partition, function (err) {
if (err) {
// Partition doesn't exist simply carry on
cbb();
} else delcbb();
});
},
function (delcbb) {
self.client.zk.deletePartitionOwnership(self.options.groupId, tp.topic, tp.partition, delcbb);
},
function (delcbb) {
self.client.zk.checkPartitionOwnership(self.id, self.options.groupId, tp.topic, tp.partition, function (err) {
if (err) {
delcbb();
} else {
delcbb('Partition should not exist');
}
});
}],
cbb);
} else {
cbb();
}
}, callback);
};

HighLevelConsumer.prototype.rebalanceAttempt = function (oldTopicPayloads, cb) {
var self = this;
// Do the rebalance.....
Expand Down Expand Up @@ -291,52 +331,13 @@ HighLevelConsumer.prototype.rebalanceAttempt = function (oldTopicPayloads, cb) {
// Release current partitions
function (callback) {
debug('HighLevelConsumer %s releasing current partitions during rebalance', self.id);
async.eachSeries(oldTopicPayloads, function (tp, cbb) {
if (tp.partition !== undefined) {
async.series([
function (delcbb) {
self.client.zk.checkPartitionOwnership(self.id, self.options.groupId, tp.topic, tp.partition, function (err) {
if (err) {
// Partition doesn't exist simply carry on
cbb();
} else delcbb();
});
},
function (delcbb) {
self.client.zk.deletePartitionOwnership(self.options.groupId, tp.topic, tp.partition, function (err) {
if (err) {
delcbb(err);
} else delcbb();
});
},
function (delcbb) {
self.client.zk.checkPartitionOwnership(self.id, self.options.groupId, tp.topic, tp.partition, function (err) {
if (err) {
delcbb();
} else {
delcbb('Partition should not exist');
}
});
}],
function (err) {
if (err) cbb(err);
else cbb();
});
} else {
cbb();
}
}, function (err) {
if (err) {
callback(err);
} else {
callback();
}
});
self._releasePartitions(oldTopicPayloads, callback);
},

// Reblannce
function (callback) {
debug('HighLevelConsumer %s determining the partitions to own during rebalance', self.id);
debug('consumerPerTopicMap.consumerTopicMap %j', consumerPerTopicMap.consumerTopicMap);
for (var topic in consumerPerTopicMap.consumerTopicMap[self.id]) {
var topicToAdd = consumerPerTopicMap.consumerTopicMap[self.id][topic];
var numberOfConsumers = consumerPerTopicMap.topicConsumerMap[topicToAdd].length;
Expand Down Expand Up @@ -367,6 +368,7 @@ HighLevelConsumer.prototype.rebalanceAttempt = function (oldTopicPayloads, cb) {
});
}
}
debug('newTopicPayloads %j', newTopicPayloads);
callback();
},

Expand Down Expand Up @@ -500,6 +502,7 @@ HighLevelConsumer.prototype.fetch = function () {
};

HighLevelConsumer.prototype.fetchOffset = function (payloads, cb) {
debug('in fetchOffset %s payloads: %j', this.id, payloads);
this.client.sendOffsetFetchRequest(this.options.groupId, payloads, cb);
};

Expand All @@ -515,11 +518,12 @@ HighLevelConsumer.prototype.offsetRequest = function (payloads, cb) {
* @param {Client~failedToRegisterConsumerCallback} cb A function to call the consumer has been registered
*/
HighLevelConsumer.prototype.registerConsumer = function (cb) {
var self = this;
this.client.zk.registerConsumer(this.options.groupId, this.id, this.payloads, function (err) {
if (err) return cb(err);
self.client.zk.listConsumers(self.options.groupId);
cb();
});
this.client.zk.listConsumers(this.options.groupId);
};

HighLevelConsumer.prototype.addTopics = function (topics, cb) {
Expand Down Expand Up @@ -561,25 +565,52 @@ HighLevelConsumer.prototype.removeTopics = function (topics, cb) {
this.client.removeTopicMetadata(topics, cb);
};

HighLevelConsumer.prototype._leaveGroup = function (cb) {
var self = this;
async.parallel([
function (callback) {
if (self.topicPayloads.length) {
self._releasePartitions(self.topicPayloads, callback);
} else {
callback(null);
}
},
function (callback) {
self.client.zk.unregisterConsumer(self.options.groupId, self.id, callback);
}
], cb);
};

HighLevelConsumer.prototype.close = function (force, cb) {
var self = this;
this.ready = false;
this.closing = true;
clearInterval(this.checkPartitionOwnershipInterval);

if (typeof force === 'function') {
cb = force;
force = false;
}

if (force) {
this.commit(force, function (err) {
if (err) {
return cb(err);
async.series([
function (callback) {
self._leaveGroup(callback);
},
function (callback) {
if (force) {
async.series([
function (callback) {
self.commit(true, callback);
},
function (callback) {
self.client.close(callback);
}
], callback);
return;
}
this.client.close(cb);
}.bind(this));
} else {
this.client.close(cb);
}
self.client.close(callback);
}
], cb);
};

HighLevelConsumer.prototype.stop = function (cb) {
Expand Down
Loading

0 comments on commit 437cdfd

Please sign in to comment.