From 437cdfddda1c2935f74db788802db6d6ea339053 Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Fri, 5 Aug 2016 10:17:48 -0400 Subject: [PATCH] Fix HLC rebalance issues and add tests (#423) * Add rebalance test * Add test involving fourth child * move children rearer out * Add more tests, updated rearer to kill randomly * Add another test for the case of rebalancing of unassigned HLCs * Fix HLC issue where rebalance does not happen if the consumer has not been assigned any partitions Fixed issue where a rebalance can happen during closing * Calling close on HLC will release owned partitions and unregister itself from the consumer group * Fix CONNECTION_LOSS error when zk is closed before leaving group * Improve stableness of rebalance tests * Add zk test for unregistering a consumer from the group --- docker-compose.yml | 1 + lib/highLevelConsumer.js | 141 +++--- lib/zookeeper.js | 846 +++++++++++++++++---------------- test/helpers/Childrearer.js | 103 ++++ test/helpers/child-hlc.js | 70 +++ test/mocks/mockZookeeper.js | 3 + test/test.highlevelConsumer.js | 131 ++++- test/test.rebalance.js | 191 ++++++++ test/test.zookeeper.js | 46 +- 9 files changed, 1034 insertions(+), 498 deletions(-) create mode 100644 test/helpers/Childrearer.js create mode 100644 test/helpers/child-hlc.js create mode 100644 test/test.rebalance.js diff --git a/docker-compose.yml b/docker-compose.yml index b2bab431..5d8ab38e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -21,6 +21,7 @@ services: KAFKA_SSL_KEY_PASSWORD: "password" KAFKA_SSL_TRUSTSTORE_LOCATION: "/var/private/ssl/certs/server.truststore.jks" KAFKA_SSL_TRUSTSTORE_PASSWORD: "password" + KAFKA_CREATE_TOPICS: "RebalanceTopic:3:1" volumes: - /var/run/docker.sock:/var/run/docker.sock - ./docker/certs:/var/private/ssl/certs diff --git a/lib/highLevelConsumer.js b/lib/highLevelConsumer.js index 977c096f..a80bc045 100644 --- a/lib/highLevelConsumer.js +++ b/lib/highLevelConsumer.js @@ -34,6 +34,7 @@ var HighLevelConsumer = function (client, topics, options) { this.options = _.defaults((options || {}), DEFAULTS); this.initialised = false; this.ready = false; + this.closing = false; this.paused = this.options.paused; this.rebalancing = false; this.id = this.options.id || this.options.groupId + '_' + uuid.v4(); @@ -133,7 +134,8 @@ HighLevelConsumer.prototype.connect = function () { } function rebalance () { - if (!self.rebalancing) { + debug('rebalance() %s is rebalancing: %s ready: %s', self.id, self.rebalancing, self.ready); + if (!self.rebalancing && !self.closing) { deregister(); self.emit('rebalancing'); @@ -166,7 +168,10 @@ HighLevelConsumer.prototype.connect = function () { if (err) { self.rebalancing = false; self.emit('error', err); - } else { + return; + } + + if (self.topicPayloads.length) { fetchAndUpdateOffsets(function (err) { self.rebalancing = false; if (err) { @@ -176,6 +181,9 @@ HighLevelConsumer.prototype.connect = function () { self.fetch(); self.emit('rebalanced'); }); + } else { // was not assigned any partitions during rebalance + self.rebalancing = false; + self.emit('rebalanced'); } }); } @@ -190,14 +198,14 @@ HighLevelConsumer.prototype.connect = function () { }); function register () { - debug('Registered listeners'); + debug('Registered listeners %s', self.id); // Register for re-balances (broker or consumer changes) self.client.zk.on('consumersChanged', rebalance); self.client.on('brokersChanged', rebalance); } function deregister () { - debug('Deregistered listeners'); + debug('Deregistered listeners %s', self.id); // Register for re-balances (broker or consumer changes) self.client.zk.removeListener('consumersChanged', rebalance); self.client.removeListener('brokersChanged', rebalance); @@ -259,6 +267,38 @@ HighLevelConsumer.prototype.connect = function () { }); }; +HighLevelConsumer.prototype._releasePartitions = function (topicPayloads, callback) { + var self = this; + async.each(topicPayloads, function (tp, cbb) { + if (tp.partition !== undefined) { + async.series([ + function (delcbb) { + self.client.zk.checkPartitionOwnership(self.id, self.options.groupId, tp.topic, tp.partition, function (err) { + if (err) { + // Partition doesn't exist simply carry on + cbb(); + } else delcbb(); + }); + }, + function (delcbb) { + self.client.zk.deletePartitionOwnership(self.options.groupId, tp.topic, tp.partition, delcbb); + }, + function (delcbb) { + self.client.zk.checkPartitionOwnership(self.id, self.options.groupId, tp.topic, tp.partition, function (err) { + if (err) { + delcbb(); + } else { + delcbb('Partition should not exist'); + } + }); + }], + cbb); + } else { + cbb(); + } + }, callback); +}; + HighLevelConsumer.prototype.rebalanceAttempt = function (oldTopicPayloads, cb) { var self = this; // Do the rebalance..... @@ -291,52 +331,13 @@ HighLevelConsumer.prototype.rebalanceAttempt = function (oldTopicPayloads, cb) { // Release current partitions function (callback) { debug('HighLevelConsumer %s releasing current partitions during rebalance', self.id); - async.eachSeries(oldTopicPayloads, function (tp, cbb) { - if (tp.partition !== undefined) { - async.series([ - function (delcbb) { - self.client.zk.checkPartitionOwnership(self.id, self.options.groupId, tp.topic, tp.partition, function (err) { - if (err) { - // Partition doesn't exist simply carry on - cbb(); - } else delcbb(); - }); - }, - function (delcbb) { - self.client.zk.deletePartitionOwnership(self.options.groupId, tp.topic, tp.partition, function (err) { - if (err) { - delcbb(err); - } else delcbb(); - }); - }, - function (delcbb) { - self.client.zk.checkPartitionOwnership(self.id, self.options.groupId, tp.topic, tp.partition, function (err) { - if (err) { - delcbb(); - } else { - delcbb('Partition should not exist'); - } - }); - }], - function (err) { - if (err) cbb(err); - else cbb(); - }); - } else { - cbb(); - } - }, function (err) { - if (err) { - callback(err); - } else { - callback(); - } - }); + self._releasePartitions(oldTopicPayloads, callback); }, // Reblannce function (callback) { debug('HighLevelConsumer %s determining the partitions to own during rebalance', self.id); + debug('consumerPerTopicMap.consumerTopicMap %j', consumerPerTopicMap.consumerTopicMap); for (var topic in consumerPerTopicMap.consumerTopicMap[self.id]) { var topicToAdd = consumerPerTopicMap.consumerTopicMap[self.id][topic]; var numberOfConsumers = consumerPerTopicMap.topicConsumerMap[topicToAdd].length; @@ -367,6 +368,7 @@ HighLevelConsumer.prototype.rebalanceAttempt = function (oldTopicPayloads, cb) { }); } } + debug('newTopicPayloads %j', newTopicPayloads); callback(); }, @@ -500,6 +502,7 @@ HighLevelConsumer.prototype.fetch = function () { }; HighLevelConsumer.prototype.fetchOffset = function (payloads, cb) { + debug('in fetchOffset %s payloads: %j', this.id, payloads); this.client.sendOffsetFetchRequest(this.options.groupId, payloads, cb); }; @@ -515,11 +518,12 @@ HighLevelConsumer.prototype.offsetRequest = function (payloads, cb) { * @param {Client~failedToRegisterConsumerCallback} cb A function to call the consumer has been registered */ HighLevelConsumer.prototype.registerConsumer = function (cb) { + var self = this; this.client.zk.registerConsumer(this.options.groupId, this.id, this.payloads, function (err) { if (err) return cb(err); + self.client.zk.listConsumers(self.options.groupId); cb(); }); - this.client.zk.listConsumers(this.options.groupId); }; HighLevelConsumer.prototype.addTopics = function (topics, cb) { @@ -561,8 +565,26 @@ HighLevelConsumer.prototype.removeTopics = function (topics, cb) { this.client.removeTopicMetadata(topics, cb); }; +HighLevelConsumer.prototype._leaveGroup = function (cb) { + var self = this; + async.parallel([ + function (callback) { + if (self.topicPayloads.length) { + self._releasePartitions(self.topicPayloads, callback); + } else { + callback(null); + } + }, + function (callback) { + self.client.zk.unregisterConsumer(self.options.groupId, self.id, callback); + } + ], cb); +}; + HighLevelConsumer.prototype.close = function (force, cb) { + var self = this; this.ready = false; + this.closing = true; clearInterval(this.checkPartitionOwnershipInterval); if (typeof force === 'function') { @@ -570,16 +592,25 @@ HighLevelConsumer.prototype.close = function (force, cb) { force = false; } - if (force) { - this.commit(force, function (err) { - if (err) { - return cb(err); + async.series([ + function (callback) { + self._leaveGroup(callback); + }, + function (callback) { + if (force) { + async.series([ + function (callback) { + self.commit(true, callback); + }, + function (callback) { + self.client.close(callback); + } + ], callback); + return; } - this.client.close(cb); - }.bind(this)); - } else { - this.client.close(cb); - } + self.client.close(callback); + } + ], cb); }; HighLevelConsumer.prototype.stop = function (cb) { diff --git a/lib/zookeeper.js b/lib/zookeeper.js index 90d06d4f..bc905d7a 100644 --- a/lib/zookeeper.js +++ b/lib/zookeeper.js @@ -1,411 +1,435 @@ -'use strict'; - -var zookeeper = require('node-zookeeper-client'); -var util = require('util'); -var async = require('async'); -var EventEmiter = require('events').EventEmitter; -var debug = require('debug')('kafka-node:zookeeper'); - -/** - * Provides kafka specific helpers for talking with zookeeper - * - * @param {String} [connectionString='localhost:2181/kafka0.8'] A list of host:port for each zookeeper node and - * optionally a chroot path - * - * @constructor - */ -var Zookeeper = function (connectionString, options) { - this.client = zookeeper.createClient(connectionString, options); - - var that = this; - this.client.on('connected', function () { - that.listBrokers(); - }); - this.client.on('disconnected', function () { - that.emit('disconnected'); - }); - this.client.connect(); -}; - -util.inherits(Zookeeper, EventEmiter); - -Zookeeper.prototype.registerConsumer = function (groupId, consumerId, payloads, cb) { - var path = '/consumers/' + groupId; - var that = this; - - async.series([ - /* eslint-disable handle-callback-err */ - function (callback) { - that.client.create( - path, - function (error, path) { - // simply carry on - callback(); - }); - }, - function (callback) { - that.client.create( - path + '/ids', - function (error, path) { - // simply carry on - callback(); - }); - }, - /* eslint-enable handle-callback-err */ - function (callback) { - that.client.create( - path + '/ids/' + consumerId, - null, - null, - zookeeper.CreateMode.EPHEMERAL, - function (error, path) { - if (error) { - callback(error); - } else { - callback(); - } - }); - }, - function (callback) { - var metadata = '{"version":1,"subscription":'; - metadata += '{'; - var sep = ''; - payloads.map(function (p) { - metadata += sep + '"' + p.topic + '": 1'; - sep = ', '; - }); - metadata += '}'; - var milliseconds = (new Date()).getTime(); - metadata += ',"pattern":"white_list","timestamp":"' + milliseconds + '"}'; - that.client.setData(path + '/ids/' + consumerId, new Buffer(metadata), function (error, stat) { - if (error) { - callback(error); - } else { - debug('Node: %s was created.', path + '/ids/' + consumerId); - cb(); - } - }); - }], - function (err) { - if (err) cb(err); - else cb(); - }); -}; - -Zookeeper.prototype.getConsumersPerTopic = function (groupId, cb) { - var consumersPath = '/consumers/' + groupId + '/ids'; - var brokerTopicsPath = '/brokers/topics'; - var that = this; - var consumerPerTopicMap = new ZookeeperConsumerMappings(); - - async.series([ - function (callback) { - that.client.getChildren(consumersPath, function (error, children, stats) { - if (error) { - callback(error); - return; - } else { - debug('Children are: %j.', children); - async.each(children, function (consumer, cbb) { - var path = consumersPath + '/' + consumer; - that.client.getData( - path, - function (error, data) { - if (error) { - cbb(error); - } else { - try { - var obj = JSON.parse(data.toString()); - // For each topic - for (var topic in obj.subscription) { - if (consumerPerTopicMap.topicConsumerMap[topic] == null) { - consumerPerTopicMap.topicConsumerMap[topic] = []; - } - consumerPerTopicMap.topicConsumerMap[topic].push(consumer); - - if (consumerPerTopicMap.consumerTopicMap[consumer] == null) { - consumerPerTopicMap.consumerTopicMap[consumer] = []; - } - consumerPerTopicMap.consumerTopicMap[consumer].push(topic); - } - - cbb(); - } catch (e) { - debug(e); - cbb(new Error('Unable to assemble data')); - } - } - } - ); - }, function (err) { - if (err) { - callback(err); - } else { - callback(); - } - }); - } - }); - }, - function (callback) { - Object.keys(consumerPerTopicMap.topicConsumerMap).forEach(function (key) { - consumerPerTopicMap.topicConsumerMap[key] = consumerPerTopicMap.topicConsumerMap[key].sort(); - }); - callback(); - }, - function (callback) { - async.each(Object.keys(consumerPerTopicMap.topicConsumerMap), function (topic, cbb) { - var path = brokerTopicsPath + '/' + topic; - that.client.getData( - path, - function (error, data) { - if (error) { - cbb(error); - } else { - var obj = JSON.parse(data.toString()); - // Get the topic partitions - var partitions = Object.keys(obj.partitions).map(function (partition) { - return partition; - }); - consumerPerTopicMap.topicPartitionMap[topic] = partitions.sort(compareNumbers); - cbb(); - } - } - ); - }, function (err) { - if (err) { - callback(err); - } else { - callback(); - } - }); - }], - function (err) { - if (err) { - debug(err); - cb(err); - } else cb(null, consumerPerTopicMap); - }); -}; - -function compareNumbers (a, b) { - return a - b; -} - -Zookeeper.prototype.listBrokers = function (cb) { - var that = this; - var path = '/brokers/ids'; - this.client.getChildren( - path, - function () { - that.listBrokers(); - }, - function (error, children) { - if (error) { - debug('Failed to list children of node: %s due to: %s.', path, error); - that.emit('error', error); - return; - } - - if (children.length) { - var brokers = {}; - async.each(children, getBrokerDetail, function (err) { - if (err) { - that.emit('error', err); - return; - } - if (!that.inited) { - that.emit('init', brokers); - that.inited = true; - } else { - that.emit('brokersChanged', brokers); - } - cb && cb(brokers); // For test - }); - } else { - if (that.inited) { - return that.emit('brokersChanged', {}); - } - that.inited = true; - that.emit('init', {}); - } - - function getBrokerDetail (id, cb) { - var path = '/brokers/ids/' + id; - that.client.getData(path, function (err, data) { - if (err) return cb(err); - brokers[id] = JSON.parse(data.toString()); - cb(); - }); - } - } - ); -}; - -Zookeeper.prototype.listConsumers = function (groupId) { - var that = this; - var path = '/consumers/' + groupId + '/ids'; - this.client.getChildren( - path, - function () { - if (!that.closed) { - that.listConsumers(groupId); - } - }, - function (error, children) { - if (error) { - debug(error); - // Ignore NO_NODE error here #157 - if (error.name !== 'NO_NODE') { - that.emit('error', error); - } - } else { - that.emit('consumersChanged'); - } - } - ); -}; - -Zookeeper.prototype.topicExists = function (topic, cb, watch) { - var path = '/brokers/topics/' + topic; - var self = this; - this.client.exists( - path, - function (event) { - debug('Got event: %s.', event); - if (watch) { - self.topicExists(topic, cb); - } - }, - function (error, stat) { - if (error) return cb(error); - cb(null, !!stat, topic); - } - ); -}; - -Zookeeper.prototype.deletePartitionOwnership = function (groupId, topic, partition, cb) { - var path = '/consumers/' + groupId + '/owners/' + topic + '/' + partition; - this.client.remove( - path, - function (error) { - if (error) { - cb(error); - } else { - debug('Removed partition ownership %s', path); - cb(); - } - } - ); -}; - -Zookeeper.prototype.addPartitionOwnership = function (consumerId, groupId, topic, partition, cb) { - var path = '/consumers/' + groupId + '/owners/' + topic + '/' + partition; - var self = this; - - async.series([ - /* eslint-disable handle-callback-err */ - function (callback) { - self.client.create( - '/consumers/' + groupId, - function (error, path) { - // simply carry on - callback(); - }); - }, - function (callback) { - self.client.create( - '/consumers/' + groupId + '/owners', - function (error, path) { - // simply carry on - callback(); - }); - }, - function (callback) { - self.client.create( - '/consumers/' + groupId + '/owners/' + topic, - function (error, path) { - // simply carry on - callback(); - }); - }, - /* eslint-enable handle-callback-err */ - function (callback) { - self.client.create( - path, - new Buffer(consumerId), - null, - zookeeper.CreateMode.EPHEMERAL, - function (error, path) { - if (error) { - callback(error); - } else callback(); - }); - }, - function (callback) { - self.client.exists(path, null, function (error, stat) { - if (error) { - callback(error); - } else if (stat) { - debug('Gained ownership of %s by %s.', path, consumerId); - callback(); - } else { - callback("Path wasn't created"); - } - }); - }], - function (err) { - if (err) cb(err); - else cb(); - }); -}; - -Zookeeper.prototype.checkPartitionOwnership = function (consumerId, groupId, topic, partition, cb) { - var path = '/consumers/' + groupId + '/owners/' + topic + '/' + partition; - var self = this; - - async.series([ - function (callback) { - self.client.exists(path, null, function (error, stat) { - if (error) { - callback(error); - } else if (stat) { - callback(); - } else { - callback("Path wasn't created"); - } - }); - }, - function (callback) { - self.client.getData( - path, - function (error, data) { - if (error) { - callback(error); - } else { - if (consumerId !== data.toString()) { - callback('Consumer not registered ' + consumerId); - } else callback(); - } - } - ); - }], - function (err) { - if (err) cb(err); - else cb(); - }); -}; - -Zookeeper.prototype.close = function () { - this.closed = true; - this.client.close(); -}; - -var ZookeeperConsumerMappings = function () { - this.consumerTopicMap = {}; - this.topicConsumerMap = {}; - this.topicPartitionMap = {}; -}; - -exports.Zookeeper = Zookeeper; -exports.ZookeeperConsumerMappings = ZookeeperConsumerMappings; +'use strict'; + +var zookeeper = require('node-zookeeper-client'); +var util = require('util'); +var async = require('async'); +var EventEmiter = require('events').EventEmitter; +var debug = require('debug')('kafka-node:zookeeper'); + +/** + * Provides kafka specific helpers for talking with zookeeper + * + * @param {String} [connectionString='localhost:2181/kafka0.8'] A list of host:port for each zookeeper node and + * optionally a chroot path + * + * @constructor + */ +var Zookeeper = function (connectionString, options) { + this.client = zookeeper.createClient(connectionString, options); + + var that = this; + this.client.on('connected', function () { + that.listBrokers(); + }); + this.client.on('disconnected', function () { + that.emit('disconnected'); + }); + this.client.connect(); +}; + +util.inherits(Zookeeper, EventEmiter); + +Zookeeper.prototype.unregisterConsumer = function (groupId, consumerId, cb) { + var path = '/consumers/' + groupId + '/ids/' + consumerId; + var self = this; + debug('unregister consumer node: %s', path); + async.waterfall([ + function (callback) { + self.client.exists(path, callback); + }, + function (stat, callback) { + if (stat) { + self.client.remove(path, function (error) { + if (error) { + debug(error); + return callback(error); + } + callback(null, true); + }); + } else { + callback(null, false); + } + } + ], cb); +}; + +Zookeeper.prototype.registerConsumer = function (groupId, consumerId, payloads, cb) { + var path = '/consumers/' + groupId; + var that = this; + + async.series([ + /* eslint-disable handle-callback-err */ + function (callback) { + that.client.create( + path, + function (error, path) { + // simply carry on + callback(); + }); + }, + function (callback) { + that.client.create( + path + '/ids', + function (error, path) { + // simply carry on + callback(); + }); + }, + /* eslint-enable handle-callback-err */ + function (callback) { + that.client.create( + path + '/ids/' + consumerId, + null, + null, + zookeeper.CreateMode.EPHEMERAL, + function (error, path) { + if (error) { + callback(error); + } else { + callback(); + } + }); + }, + function (callback) { + var metadata = '{"version":1,"subscription":'; + metadata += '{'; + var sep = ''; + payloads.map(function (p) { + metadata += sep + '"' + p.topic + '": 1'; + sep = ', '; + }); + metadata += '}'; + var milliseconds = (new Date()).getTime(); + metadata += ',"pattern":"white_list","timestamp":"' + milliseconds + '"}'; + that.client.setData(path + '/ids/' + consumerId, new Buffer(metadata), function (error, stat) { + if (error) { + callback(error); + } else { + debug('Node: %s was created.', path + '/ids/' + consumerId); + cb(); + } + }); + }], + function (err) { + if (err) cb(err); + else cb(); + }); +}; + +Zookeeper.prototype.getConsumersPerTopic = function (groupId, cb) { + var consumersPath = '/consumers/' + groupId + '/ids'; + var brokerTopicsPath = '/brokers/topics'; + var that = this; + var consumerPerTopicMap = new ZookeeperConsumerMappings(); + + async.series([ + function (callback) { + that.client.getChildren(consumersPath, function (error, children, stats) { + if (error) { + callback(error); + return; + } else { + debug('Children are: %j.', children); + async.each(children, function (consumer, cbb) { + var path = consumersPath + '/' + consumer; + that.client.getData( + path, + function (error, data) { + if (error) { + cbb(error); + } else { + try { + var obj = JSON.parse(data.toString()); + // For each topic + for (var topic in obj.subscription) { + if (consumerPerTopicMap.topicConsumerMap[topic] == null) { + consumerPerTopicMap.topicConsumerMap[topic] = []; + } + consumerPerTopicMap.topicConsumerMap[topic].push(consumer); + + if (consumerPerTopicMap.consumerTopicMap[consumer] == null) { + consumerPerTopicMap.consumerTopicMap[consumer] = []; + } + consumerPerTopicMap.consumerTopicMap[consumer].push(topic); + } + + cbb(); + } catch (e) { + debug(e); + cbb(new Error('Unable to assemble data')); + } + } + } + ); + }, function (err) { + if (err) { + callback(err); + } else { + callback(); + } + }); + } + }); + }, + function (callback) { + Object.keys(consumerPerTopicMap.topicConsumerMap).forEach(function (key) { + consumerPerTopicMap.topicConsumerMap[key] = consumerPerTopicMap.topicConsumerMap[key].sort(); + }); + callback(); + }, + function (callback) { + async.each(Object.keys(consumerPerTopicMap.topicConsumerMap), function (topic, cbb) { + var path = brokerTopicsPath + '/' + topic; + that.client.getData( + path, + function (error, data) { + if (error) { + cbb(error); + } else { + var obj = JSON.parse(data.toString()); + // Get the topic partitions + var partitions = Object.keys(obj.partitions).map(function (partition) { + return partition; + }); + consumerPerTopicMap.topicPartitionMap[topic] = partitions.sort(compareNumbers); + cbb(); + } + } + ); + }, function (err) { + if (err) { + callback(err); + } else { + callback(); + } + }); + }], + function (err) { + if (err) { + debug(err); + cb(err); + } else cb(null, consumerPerTopicMap); + }); +}; + +function compareNumbers (a, b) { + return a - b; +} + +Zookeeper.prototype.listBrokers = function (cb) { + var that = this; + var path = '/brokers/ids'; + this.client.getChildren( + path, + function () { + that.listBrokers(); + }, + function (error, children) { + if (error) { + debug('Failed to list children of node: %s due to: %s.', path, error); + that.emit('error', error); + return; + } + + if (children.length) { + var brokers = {}; + async.each(children, getBrokerDetail, function (err) { + if (err) { + that.emit('error', err); + return; + } + if (!that.inited) { + that.emit('init', brokers); + that.inited = true; + } else { + that.emit('brokersChanged', brokers); + } + cb && cb(brokers); // For test + }); + } else { + if (that.inited) { + return that.emit('brokersChanged', {}); + } + that.inited = true; + that.emit('init', {}); + } + + function getBrokerDetail (id, cb) { + var path = '/brokers/ids/' + id; + that.client.getData(path, function (err, data) { + if (err) return cb(err); + brokers[id] = JSON.parse(data.toString()); + cb(); + }); + } + } + ); +}; + +Zookeeper.prototype.listConsumers = function (groupId) { + var that = this; + var path = '/consumers/' + groupId + '/ids'; + this.client.getChildren( + path, + function () { + if (!that.closed) { + that.listConsumers(groupId); + } + }, + function (error, children) { + if (error) { + debug(error); + // Ignore NO_NODE error here #157 + if (error.name !== 'NO_NODE') { + that.emit('error', error); + } + } else { + that.emit('consumersChanged'); + } + } + ); +}; + +Zookeeper.prototype.topicExists = function (topic, cb, watch) { + var path = '/brokers/topics/' + topic; + var self = this; + this.client.exists( + path, + function (event) { + debug('Got event: %s.', event); + if (watch) { + self.topicExists(topic, cb); + } + }, + function (error, stat) { + if (error) return cb(error); + cb(null, !!stat, topic); + } + ); +}; + +Zookeeper.prototype.deletePartitionOwnership = function (groupId, topic, partition, cb) { + var path = '/consumers/' + groupId + '/owners/' + topic + '/' + partition; + this.client.remove( + path, + function (error) { + if (error) { + cb(error); + } else { + debug('Removed partition ownership %s', path); + cb(); + } + } + ); +}; + +Zookeeper.prototype.addPartitionOwnership = function (consumerId, groupId, topic, partition, cb) { + var path = '/consumers/' + groupId + '/owners/' + topic + '/' + partition; + var self = this; + + async.series([ + /* eslint-disable handle-callback-err */ + function (callback) { + self.client.create( + '/consumers/' + groupId, + function (error, path) { + // simply carry on + callback(); + }); + }, + function (callback) { + self.client.create( + '/consumers/' + groupId + '/owners', + function (error, path) { + // simply carry on + callback(); + }); + }, + function (callback) { + self.client.create( + '/consumers/' + groupId + '/owners/' + topic, + function (error, path) { + // simply carry on + callback(); + }); + }, + /* eslint-enable handle-callback-err */ + function (callback) { + self.client.create( + path, + new Buffer(consumerId), + null, + zookeeper.CreateMode.EPHEMERAL, + function (error, path) { + if (error) { + callback(error); + } else callback(); + }); + }, + function (callback) { + self.client.exists(path, null, function (error, stat) { + if (error) { + callback(error); + } else if (stat) { + debug('Gained ownership of %s by %s.', path, consumerId); + callback(); + } else { + callback("Path wasn't created"); + } + }); + }], + function (err) { + if (err) cb(err); + else cb(); + }); +}; + +Zookeeper.prototype.checkPartitionOwnership = function (consumerId, groupId, topic, partition, cb) { + var path = '/consumers/' + groupId + '/owners/' + topic + '/' + partition; + var self = this; + + async.series([ + function (callback) { + self.client.exists(path, null, function (error, stat) { + if (error) { + callback(error); + } else if (stat) { + callback(); + } else { + callback("Path wasn't created"); + } + }); + }, + function (callback) { + self.client.getData( + path, + function (error, data) { + if (error) { + callback(error); + } else { + if (consumerId !== data.toString()) { + callback('Consumer not registered ' + consumerId); + } else callback(); + } + } + ); + }], + function (err) { + if (err) cb(err); + else cb(); + }); +}; + +Zookeeper.prototype.close = function () { + this.closed = true; + this.client.close(); +}; + +var ZookeeperConsumerMappings = function () { + this.consumerTopicMap = {}; + this.topicConsumerMap = {}; + this.topicPartitionMap = {}; +}; + +exports.Zookeeper = Zookeeper; +exports.ZookeeperConsumerMappings = ZookeeperConsumerMappings; diff --git a/test/helpers/Childrearer.js b/test/helpers/Childrearer.js new file mode 100644 index 00000000..3b53f2a9 --- /dev/null +++ b/test/helpers/Childrearer.js @@ -0,0 +1,103 @@ +var EventEmitter = require('events'); +var util = require('util'); +var debug = require('debug')('kafka-node:Test-Childrearer'); +var fork = require('child_process').fork; +var async = require('async'); +var _ = require('lodash'); + +function Childrearer () { + EventEmitter.call(this); + this.children = []; + this.id = 0; +} + +util.inherits(Childrearer, EventEmitter); + +Childrearer.prototype.setVerifier = function (topic, groupId, verify) { + this.topic = topic; + this.groupId = groupId; + this.verify = verify; +}; + +Childrearer.prototype.nextId = function () { + return ++this.id; +}; + +Childrearer.prototype.closeAll = function () { + this.children.forEach(function (child) { + child.kill(); + }); +}; + +Childrearer.prototype.kill = function (numberOfChildren, callback) { + var children = _.sample(this.children, numberOfChildren); + this._killEachChild(children, callback); +}; + +Childrearer.prototype.killLast = function (callback) { + var child = _.last(this.children); + this._killEachChild([child], callback); +}; + +Childrearer.prototype.killFirst = function (callback) { + var child = _.first(this.children); + this._killEachChild([child], callback); +}; + +Childrearer.prototype._killEachChild = function (children, callback) { + var self = this; + async.each(children, function (child, callback) { + child.once('close', function (code, signal) { + debug('child %s killed %d %s', this._childNum, code, signal); + _.pull(self.children, this); + callback(); + }); + child.kill(); + }, callback); +}; + +Childrearer.prototype.raise = function (children, callback, waitTime) { + var newChildren = _.times(children, this._raiseChild, this); + + this.children = this.children.concat(newChildren); + + if (callback) { + async.series([ + function (callback) { + async.each(newChildren, function (child, callback) { + child.once('message', function (data) { + if (data.event === 'registered') { + callback(null); + } else { + callback(new Error('unregistered event: ' + data.event)); + } + }); + }, callback); + }, + + function (callback) { + if (waitTime) { + setTimeout(callback, waitTime); + } else { + callback(); + } + }], callback + ); + } +}; + +Childrearer.prototype._raiseChild = function () { + var self = this; + var childNumber = this.nextId(); + debug('forking child %d', childNumber); + var child = fork('test/helpers/child-hlc', ['--groupId=' + this.groupId, '--topic=' + this.topic, '--consumerId=' + 'child_' + childNumber]); + child._childNum = childNumber; + child.on('message', function (data) { + if (data.message) { + self.verify.call(this, data); + } + }); + return child; +}; + +module.exports = Childrearer; diff --git a/test/helpers/child-hlc.js b/test/helpers/child-hlc.js new file mode 100644 index 00000000..ba34a12e --- /dev/null +++ b/test/helpers/child-hlc.js @@ -0,0 +1,70 @@ +'use strict'; + +var kafka = require('../../'); +var HighLevelConsumer = kafka.HighLevelConsumer; +var Client = kafka.Client; +var argv = require('optimist').argv; +var topic = argv.topic || 'topic1'; +var uuid = require('node-uuid'); +var host = process.env['KAFKA_TEST_HOST'] || ''; +var client = new Client(host, 'child-' + uuid.v4(), {sessionTimeout: 5000}); +var topics = [{topic: topic}]; +var options = { autoCommit: true, fetchMaxWaitMs: 1000, fetchMaxBytes: 1024 * 1024 }; +var debug = require('debug')('kafka-node:Child-HLC'); + +if (argv.groupId) { + options.groupId = argv.groupId; +} + +if (argv.consumerId) { + options.id = argv.consumerId; +} + +var consumer = new HighLevelConsumer(client, topics, options); + +consumer.on('message', function (message) { + var out = { + id: consumer.id, + message: message + }; + process.send(out); +}); + +consumer.on('error', function (err) { + debug('error', err); +}); + +consumer.on('rebalanced', function () { + debug('%s rebalanced!', consumer.id); + sendEvent('rebalanced'); +}); + +consumer.on('rebalancing', function () { + debug('%s is rebalancing', consumer.id); +}); + +consumer.on('registered', function () { + debug('%s registered', consumer.id); + sendEvent('registered'); +}); + +function sendEvent (event) { + process.send({ + id: consumer.id, + event: event + }); +} + +function close (signal) { + return function () { + debug('closing the consumer (%s) [%s].', signal, consumer.id); + consumer.close(true, function () { + process.exit(); + }); + }; +} + +process.once('SIGINT', close('SIGINT')); +process.once('SIGTERM', close('SIGTERM')); +process.once('SIGABRT', close('SIGABRT')); +process.once('disconnect', close('disconnect')); diff --git a/test/mocks/mockZookeeper.js b/test/mocks/mockZookeeper.js index bce478b0..be3e2bb4 100644 --- a/test/mocks/mockZookeeper.js +++ b/test/mocks/mockZookeeper.js @@ -16,6 +16,9 @@ function FakeZookeeper () { this.registerConsumer = function (groupId, id, payloads, cb) { setImmediate(cb); }; + this.unregisterConsumer = function (groupId, id, cb) { + setImmediate(cb); + }; this.listConsumers = function (groupId) {}; this.getConsumersPerTopic = function (groupId, cb) { var consumerTopicMap = {}; diff --git a/test/test.highlevelConsumer.js b/test/test.highlevelConsumer.js index a0457b09..a116260d 100644 --- a/test/test.highlevelConsumer.js +++ b/test/test.highlevelConsumer.js @@ -7,6 +7,84 @@ var should = require('should'); var InvalidConfigError = require('../lib/errors/InvalidConfigError'); describe('HighLevelConsumer', function () { + describe('#close', function (done) { + var client, consumer, leaveGroupStub, commitStub, clientCloseSpy; + + beforeEach(function () { + client = new FakeClient(); + consumer = new HighLevelConsumer(client, [], {groupId: 'mygroup'}); + leaveGroupStub = sinon.stub(consumer, '_leaveGroup').yields(); + commitStub = sinon.stub(consumer, 'commit').yields(); + clientCloseSpy = sinon.spy(client, 'close'); + }); + + it('should leave consumer group, commit and then close', function (done) { + consumer.close(true, function (error) { + consumer.closing.should.be.true; + consumer.ready.should.be.false; + + sinon.assert.calledOnce(leaveGroupStub); + sinon.assert.calledOnce(commitStub); + sinon.assert.calledOnce(clientCloseSpy); + sinon.assert.callOrder(leaveGroupStub, commitStub, clientCloseSpy); + done(error); + }); + }); + + it('should leave consumer group, and then close', function (done) { + consumer.close(false, function (error) { + consumer.closing.should.be.true; + consumer.ready.should.be.false; + + sinon.assert.calledOnce(leaveGroupStub); + sinon.assert.calledOnce(clientCloseSpy); + sinon.assert.callOrder(leaveGroupStub, clientCloseSpy); + done(error); + }); + }); + + it('should leave consumer group, and then close (single callback argument)', function (done) { + consumer.close(function (error) { + consumer.closing.should.be.true; + consumer.ready.should.be.false; + + sinon.assert.calledOnce(leaveGroupStub); + sinon.assert.calledOnce(clientCloseSpy); + sinon.assert.callOrder(leaveGroupStub, clientCloseSpy); + done(error); + }); + }); + }); + + describe('#_leaveGroup', function () { + var client, consumer, unregisterSpy, releasePartitionsStub; + + beforeEach(function () { + client = new FakeClient(); + unregisterSpy = sinon.spy(client.zk, 'unregisterConsumer'); + consumer = new HighLevelConsumer(client, [], {groupId: 'mygroup'}); + releasePartitionsStub = sinon.stub(consumer, '_releasePartitions').yields(); + }); + + it('should releases partitions and unregister it self', function (done) { + consumer.topicPayloads = [{topic: 'fake-topic', partition: 0, offset: 0, maxBytes: 1048576, metadata: 'm'}]; + consumer._leaveGroup(function (error) { + sinon.assert.calledOnce(unregisterSpy); + sinon.assert.calledOnce(releasePartitionsStub); + done(error); + }); + }); + + it('should only unregister it self', function (done) { + consumer.topicPayloads = []; + consumer._leaveGroup(function (error) { + sinon.assert.notCalled(releasePartitionsStub); + sinon.assert.calledOnce(unregisterSpy); + done(error); + }); + }); + }); + describe('validate groupId', function () { function validateThrowsInvalidConfigError (groupId) { should.throws(function () { @@ -66,20 +144,25 @@ describe('HighLevelConsumer', function () { it('should emit rebalanced event and clear rebalancing flag only after offsets are updated', function (done) { client.emit('ready'); - highLevelConsumer.on('registered', function () { - // verify rebalancing is false until rebalance finishes - var refreshMetadataStub = sandbox.stub(client, 'refreshMetadata', function (topicNames, cb) { - highLevelConsumer.rebalancing.should.be.true; - setImmediate(cb); - }); + sinon.stub(highLevelConsumer, 'rebalanceAttempt', function (oldTopicPayloads, cb) { + highLevelConsumer.topicPayloads = [{topic: 'fake-topic', partition: 0, offset: 0, maxBytes: 1048576, metadata: 'm'}]; + cb(); + }); + + // verify rebalancing is false until rebalance finishes + var refreshMetadataStub = sandbox.stub(client, 'refreshMetadata', function (topicNames, cb) { + highLevelConsumer.rebalancing.should.be.true; + setImmediate(cb); + }); + highLevelConsumer.on('registered', function () { var sendOffsetFetchRequestStub = sandbox.stub(client, 'sendOffsetFetchRequest', function (groupId, payloads, cb) { highLevelConsumer.rebalancing.should.be.true; // wait for the results setImmediate(function () { // verify again before the callback highLevelConsumer.rebalancing.should.be.true; - cb(); + cb(null, [{topic: 'fake-topic', partition: 0, offset: 0, maxBytes: 1048576, metadata: 'm'}]); }); }); @@ -100,6 +183,11 @@ describe('HighLevelConsumer', function () { it('should emit error and clear rebalancing flag if fetchOffset failed', function (done) { client.emit('ready'); + sinon.stub(highLevelConsumer, 'rebalanceAttempt', function (oldTopicPayloads, cb) { + highLevelConsumer.topicPayloads = [{topic: 'fake-topic', partition: 0, offset: 0, maxBytes: 1048576, metadata: 'm'}]; + cb(); + }); + highLevelConsumer.on('registered', function () { sandbox.stub(client, 'sendOffsetFetchRequest', function (groupId, payloads, cb) { setImmediate(cb, new Error('Fetching offset failed')); @@ -122,21 +210,28 @@ describe('HighLevelConsumer', function () { it('should ignore fetch calls from "done" event handler during rebalance', function (done) { client.emit('ready'); + sinon.stub(highLevelConsumer, 'rebalanceAttempt', function (oldTopicPayloads, cb) { + highLevelConsumer.topicPayloads = [{topic: 'fake-topic', partition: 0, offset: 0, maxBytes: 1048576, metadata: 'm'}]; + cb(); + }); + var sendFetchRequestSpy = sandbox.spy(client, 'sendFetchRequest'); var fetchSpy = sandbox.spy(highLevelConsumer, 'fetch'); - highLevelConsumer.on('registered', function () { - client.sendOffsetFetchRequest = function (groupId, payloads, cb) { - // simulate a done event before offset fetch returns - highLevelConsumer.ready = true; - highLevelConsumer.paused = false; - highLevelConsumer.emit('done', {}); - - setTimeout(function () { - cb(); - }, 100); - }; + sandbox.stub(client, 'sendOffsetFetchRequest', function (groupId, payloads, cb) { + highLevelConsumer.rebalancing.should.be.true; + highLevelConsumer.ready = true; + highLevelConsumer.paused = false; + highLevelConsumer.emit('done', {}); + // wait for the results + setImmediate(function () { + // verify again before the callback + highLevelConsumer.rebalancing.should.be.true; + cb(null, [{topic: 'fake-topic', partition: 0, offset: 0, maxBytes: 1048576, metadata: 'm'}]); + }); + }); + highLevelConsumer.on('registered', function () { highLevelConsumer.on('rebalanced', function () { if (fetchSpy.callCount !== 2) { done(fetchSpy.callCount.should.equal(2)); diff --git a/test/test.rebalance.js b/test/test.rebalance.js new file mode 100644 index 00000000..9222c2a2 --- /dev/null +++ b/test/test.rebalance.js @@ -0,0 +1,191 @@ +'use strict'; + +var kafka = require('..'); +var Client = kafka.Client; +var HighLevelProducer = kafka.HighLevelProducer; +var async = require('async'); +var debug = require('debug')('kafka-node:Test-Rebalance'); +var Childrearer = require('./helpers/Childrearer'); +var uuid = require('node-uuid'); +var _ = require('lodash'); +var host = process.env['KAFKA_TEST_HOST'] || ''; + +describe('Integrated HLC Rebalance', function () { + var producer; + var topic = 'RebalanceTopic'; + var rearer; + var groupId = 'rebal_group'; + + function sendMessage (topic, message, done) { + producer.send([ {topic: topic, messages: [message]} ], function () { + debug('sent', message); + done(); + }); + } + + before(function (done) { + var client = new Client(host); + producer = new HighLevelProducer(client); + client.on('ready', function () { + client.refreshMetadata([topic], function (data) { + client.topicPartitions[topic].should.be.length(3); + done(); + }); + }); + }); + + beforeEach(function (done) { + rearer = new Childrearer(); + + // make sure there are no other consumers on this topic before starting test + producer.client.zk.getConsumersPerTopic(groupId, function (error, data) { + if (error && error.name === 'NO_NODE') { + done(); + } else { + data.consumerTopicMap.should.be.empty; + data.topicConsumerMap.should.be.empty; + data.topicPartitionMap.should.be.empty; + done(error); + } + }); + }); + + afterEach(function (done) { + debug('killChildren'); + rearer.closeAll(); + setTimeout(done, 500); + }); + + function sendMessages (messages, done) { + async.eachSeries(messages, function (message, cb) { + sendMessage(topic, message, cb); + }, function (error, results) { + if (error) { + console.error('Send Error', error); + return done(error); + } + debug('all messages sent'); + }); + } + + function getConsumerVerifier (messages, expectedPartitionsConsumed, expectedConsumersConsuming, done) { + var processedMessages = 0; + var consumedByConsumer = {}; + var verified = _.once(done); + + return function onData (data) { + debug('From child %d %j', this._childNum, data); + topic.should.be.eql(data.message.topic); + if (~messages.indexOf(data.message.value)) { + processedMessages++; + consumedByConsumer[data.id] = true; + } + if (processedMessages >= messages.length && Object.keys(consumedByConsumer).length === expectedConsumersConsuming) { + verified(); + } + }; + } + + function generateMessages (numberOfMessages, prefix) { + return _.times(numberOfMessages, function () { + return prefix + '-' + uuid.v4(); + }); + } + + it('verify two consumers consuming messages on all partitions', function (done) { + var messages = generateMessages(3, 'verify 2 c'); + var numberOfConsumers = 2; + + var verify = getConsumerVerifier(messages, 3, numberOfConsumers, done); + + rearer.setVerifier(topic, groupId, verify); + rearer.raise(numberOfConsumers, function () { + sendMessages(messages, done); + }); + }); + + it('verify three consumers consuming messages on all partitions', function (done) { + var messages = generateMessages(3, 'verify 3 c'); + var numberOfConsumers = 3; + + var verify = getConsumerVerifier(messages, 3, numberOfConsumers, done); + + rearer.setVerifier(topic, groupId, verify); + rearer.raise(numberOfConsumers); + + sendMessages(messages, done); + }); + + it('verify three of four consumers are consuming messages on all partitions', function (done) { + var messages = generateMessages(3, 'verify 4 c'); + + var verify = getConsumerVerifier(messages, 3, 3, done); + rearer.setVerifier(topic, groupId, verify); + rearer.raise(4); + + sendMessages(messages, done); + }); + + it('verify one consumer consumes all messages on all partitions after one out of the two consumer is killed', function (done) { + var messages = generateMessages(4, 'verify 1 c 1 killed'); + var verify = getConsumerVerifier(messages, 3, 1, done); + + rearer.setVerifier(topic, groupId, verify); + rearer.raise(2, function () { + rearer.kill(1, function () { + sendMessages(messages, done); + }); + }, 500); + }); + + it('verify three consumer consumes all messages on all partitions after one that is unassigned is killed', function (done) { + var messages = generateMessages(3, 'verify 2 c 2 killed'); + var verify = getConsumerVerifier(messages, 3, 3, done); + + rearer.setVerifier(topic, groupId, verify); + + async.series([ + function (callback) { + rearer.raise(3, callback); + }, + function (callback) { + setTimeout(callback, 1000); + }, + function (callback) { + rearer.raise(1, callback); + }, + function (callback) { + setTimeout(callback, 1000); + }, + function (callback) { + rearer.killFirst(callback); + } + ], function () { + sendMessages(messages, done); + }); + }); + + it('verify two consumer consumes all messages on all partitions after two out of the four consumers are killed', function (done) { + var messages = generateMessages(4, 'verify 2 c 2 killed'); + var verify = getConsumerVerifier(messages, 3, 2, done); + + rearer.setVerifier(topic, groupId, verify); + rearer.raise(4, function () { + rearer.kill(2, function () { + sendMessages(messages, done); + }); + }, 500); + }); + + it('verify three consumer consumes all messages on all partitions after three out of the six consumers are killed', function (done) { + var messages = generateMessages(4, 'verify 3 c 3 killed'); + var verify = getConsumerVerifier(messages, 3, 3, done); + + rearer.setVerifier(topic, groupId, verify); + rearer.raise(6, function () { + rearer.kill(3, function () { + sendMessages(messages, done); + }); + }, 1000); + }); +}); diff --git a/test/test.zookeeper.js b/test/test.zookeeper.js index 7eaa149d..cacde5cc 100644 --- a/test/test.zookeeper.js +++ b/test/test.zookeeper.js @@ -1,25 +1,18 @@ 'use strict'; -var libPath = process.env['kafka-cov'] ? '../lib-cov/' : '../lib/'; -var Zookeeper = require(libPath + 'zookeeper').Zookeeper; +var Zookeeper = require('../lib/zookeeper').Zookeeper; var host = process.env['KAFKA_TEST_HOST'] || ''; var zk; -/* - * To run the test, you should ensure: - * - at least 2 broker running - */ - -xdescribe('Zookeeper', function () { +describe('Zookeeper', function () { before(function () { zk = new Zookeeper(host); }); describe('when init success', function () { it('should emit init event', function (done) { - var zk = new Zookeeper(host); - zk.on('init', function (brokers) { - Object.keys(brokers).length.should.above(1); + zk.once('init', function (brokers) { + Object.keys(brokers).length.should.eql(1); done(); }); }); @@ -30,7 +23,7 @@ xdescribe('Zookeeper', function () { it('should return all brokers', function (done) { zk.inited = false; zk.listBrokers(function (brokers) { - Object.keys(brokers).length.should.above(1); + Object.keys(brokers).length.should.eql(1); done(); }); }); @@ -42,18 +35,43 @@ xdescribe('Zookeeper', function () { zk.inited = true; zk.listBrokers(function (brokers) { - Object.keys(brokers).length.should.above(1); + Object.keys(brokers).length.should.eql(1); if (++count === 2) done(); }); zk.on('brokersChanged', function (brokers) { - Object.keys(brokers).length.should.above(1); + Object.keys(brokers).length.should.eql(1); if (++count === 2) done(); }); }); }); }); + describe('#unregisterConsumer', function () { + it('removes the consumer ID node if it exists yields true if successful', function (done) { + var groupId = 'awesomeFakeGroupId'; + var consumerId = 'fabulousConsumerId'; + zk.registerConsumer(groupId, consumerId, [{topic: 'fake-topic'}], + function (error) { + if (error) { + return done(error); + } + + zk.unregisterConsumer(groupId, consumerId, function (error, result) { + result.should.be.true; + done(error); + }); + }); + }); + + it('yields false if the consumer ID node does not exists', function (done) { + zk.unregisterConsumer('fakeTestGroupId', 'fakeConsumerId', function (error, result) { + result.should.be.false; + done(error); + }); + }); + }); + describe('#topicExists', function () { it('should return false when topic not exist', function (done) { zk.topicExists('_not_exist_topic_test', function (err, existed, topic) {