diff --git a/lib/RESTClient.js b/lib/RESTClient.js index 4f7d8f2..079b147 100644 --- a/lib/RESTClient.js +++ b/lib/RESTClient.js @@ -330,6 +330,38 @@ class RESTClient { * @return {undefined} */ request(method, beginPath, log, params, data, callback) { + this.requestStreamed( + method, beginPath, log, params, data, (err, res) => { + if (err) { + return callback(err); + } + const ret = []; + let retLen = 0; + res.on('data', data => { + ret.push(data); + retLen += data.length; + }).on('error', callback) + .on('end', () => { + callback(null, Buffer.concat(ret, retLen).toString()); + }); + return undefined; + }); + } + + /** + * Streamed version of RESTClient.request() returning a byte + * stream object in callback + * + * @param {string} method - the HTTP method of the request + * @param {string} beginPath - formated path without parameters + * @param {object} log - logger + * @param {object} params - parameters of the request + * @param {string} data - data of the request + * @param {function} callback - callback + * + * @return {undefined} + */ + requestStreamed(method, beginPath, log, params, data, callback) { log.debug('sending request', { httpMethod: method, beginPath }); let path = beginPath; assert(typeof callback === 'function', 'callback must be a function'); @@ -355,9 +387,6 @@ class RESTClient { options.key = this._key; options.cert = this._cert; } - const ret = []; - let retLen = 0; - // somehow options can get cyclical so fields need to be chosen // for display log.debug('direct request to MD endpoint', { httpMethod: method, @@ -381,19 +410,12 @@ class RESTClient { req.write(binData); } - req.on('response', res => { - res.on('data', data => { - ret.push(data); - retLen += data.length; - }).on('error', callback).on('end', () => { - this.endRespond(res, Buffer.concat(ret, retLen).toString(), - log, callback); - }); - }).on('error', error => { - // covers system errors like ECONNREFUSED, ECONNRESET etc. - log.error('error sending request to metadata', { error }); - callback(errors.InternalError); - }).end(); + req.on('response', res => this.endRespond(res, res, log, callback)) + .on('error', error => { + // covers system errors like ECONNREFUSED, ECONNRESET etc. + log.error('error sending request to metadata', { error }); + callback(errors.InternalError); + }).end(); } /** @@ -414,7 +436,7 @@ class RESTClient { /** * Get raft logs from bucketd - * get server's response and return it + * get server's response and return it as a stream * * @param {string} raftId - raft session id * @param {number} [start=undefined] - starting sequence number. If it is @@ -425,7 +447,7 @@ class RESTClient { * @param {boolean} [targetLeader=undefined] - true: from leader instead of * follower * @param {string} reqUids - the identifier of the request - * @param {callback} callback - callback(err, info) + * @param {callback} callback - callback(err, stream) * @param {werelogs.Logger} [logger] - Logger instance * @return {undefined} */ @@ -443,7 +465,7 @@ class RESTClient { query.targetLeader = targetLeader; } log.debug('getting raft log', { raftId, path, query }); - this.request('GET', path, log, query, null, callback); + this.requestStreamed('GET', path, log, query, null, callback); } } diff --git a/tests/functional/client.js b/tests/functional/client.js index 7150db7..71d0915 100644 --- a/tests/functional/client.js +++ b/tests/functional/client.js @@ -81,32 +81,42 @@ describe('Bucket Client tests', function testClient() { it('should get logs from bucketd', done => { const start = 1; const end = 10; - client.getRaftLog(0, 1, 10, true, null, (err, data) => { + client.getRaftLog(0, 1, 10, true, null, (err, stream) => { if (err) { return done(err); } - const obj = JSON.parse(data); - assert.strictEqual(Object.prototype.hasOwnProperty.call( - obj, 'info'), true); - const info = obj.info; - assert.strictEqual(['start', 'end', 'cseq', 'prune'] - .every(key => Object.prototype.hasOwnProperty.call( - obj.info, key)), true); - assert.strictEqual(info.start >= start, true); - assert.strictEqual(info.end <= end, true); - // NOTE: this check will be removed when pruned logs are - // retrieved also - assert.strictEqual(info.prune <= info.start, true); - assert.strictEqual(info.cseq >= info.end, true); - assert.strictEqual(Object.prototype.hasOwnProperty.call( - obj, 'log'), true); - const logs = obj.log; - assert.strictEqual(Array.isArray(logs), true); - assert.strictEqual(logs.length >= 1, true); - assert.strictEqual(logs.every(log => - (typeof log === 'object') && (Object.keys(log).length > 0) - ), true); - return done(); + const dataBufs = []; + let dataLen = 0; + stream.on('data', data => { + dataBufs.push(data); + dataLen += data.length; + }).on('error', done) + .on('end', () => { + const data = Buffer.concat(dataBufs, dataLen).toString(); + const obj = JSON.parse(data); + assert.strictEqual(Object.prototype.hasOwnProperty.call( + obj, 'info'), true); + const info = obj.info; + assert.strictEqual(['start', 'end', 'cseq', 'prune'] + .every(key => Object.prototype.hasOwnProperty.call( + obj.info, key)), true); + assert.strictEqual(info.start >= start, true); + assert.strictEqual(info.end <= end, true); + // NOTE: this check will be removed when pruned logs are + // retrieved also + assert.strictEqual(info.prune <= info.start, true); + assert.strictEqual(info.cseq >= info.end, true); + assert.strictEqual(Object.prototype.hasOwnProperty.call( + obj, 'log'), true); + const logs = obj.log; + assert.strictEqual(Array.isArray(logs), true); + assert.strictEqual(logs.length >= 1, true); + assert.strictEqual(logs.every(log => + (typeof log === 'object') && (Object.keys(log).length > 0) + ), true); + return done(); + }); + return undefined; }); }); });