Skip to content

Commit

Permalink
merge #126
Browse files Browse the repository at this point in the history
  • Loading branch information
ironman-machine authored and Cloud User committed Jan 17, 2018
2 parents bd7b4d8 + 2a16b8e commit b0836dc
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 42 deletions.
60 changes: 41 additions & 19 deletions lib/RESTClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,38 @@ class RESTClient {
* @return {undefined}
*/
request(method, beginPath, log, params, data, callback) {
this.requestStreamed(
method, beginPath, log, params, data, (err, res) => {
if (err) {
return callback(err);
}
const ret = [];
let retLen = 0;
res.on('data', data => {
ret.push(data);
retLen += data.length;
}).on('error', callback)
.on('end', () => {
callback(null, Buffer.concat(ret, retLen).toString());
});
return undefined;
});
}

/**
* Streamed version of RESTClient.request() returning a byte
* stream object in callback
*
* @param {string} method - the HTTP method of the request
* @param {string} beginPath - formated path without parameters
* @param {object} log - logger
* @param {object} params - parameters of the request
* @param {string} data - data of the request
* @param {function} callback - callback
*
* @return {undefined}
*/
requestStreamed(method, beginPath, log, params, data, callback) {
log.debug('sending request', { httpMethod: method, beginPath });
let path = beginPath;
assert(typeof callback === 'function', 'callback must be a function');
Expand All @@ -355,9 +387,6 @@ class RESTClient {
options.key = this._key;
options.cert = this._cert;
}
const ret = [];
let retLen = 0;

// somehow options can get cyclical so fields need to be chosen
// for display
log.debug('direct request to MD endpoint', { httpMethod: method,
Expand All @@ -381,19 +410,12 @@ class RESTClient {
req.write(binData);
}

req.on('response', res => {
res.on('data', data => {
ret.push(data);
retLen += data.length;
}).on('error', callback).on('end', () => {
this.endRespond(res, Buffer.concat(ret, retLen).toString(),
log, callback);
});
}).on('error', error => {
// covers system errors like ECONNREFUSED, ECONNRESET etc.
log.error('error sending request to metadata', { error });
callback(errors.InternalError);
}).end();
req.on('response', res => this.endRespond(res, res, log, callback))
.on('error', error => {
// covers system errors like ECONNREFUSED, ECONNRESET etc.
log.error('error sending request to metadata', { error });
callback(errors.InternalError);
}).end();
}

/**
Expand All @@ -414,7 +436,7 @@ class RESTClient {

/**
* Get raft logs from bucketd
* get server's response and return it
* get server's response and return it as a stream
*
* @param {string} raftId - raft session id
* @param {number} [start=undefined] - starting sequence number. If it is
Expand All @@ -425,7 +447,7 @@ class RESTClient {
* @param {boolean} [targetLeader=undefined] - true: from leader instead of
* follower
* @param {string} reqUids - the identifier of the request
* @param {callback} callback - callback(err, info)
* @param {callback} callback - callback(err, stream)
* @param {werelogs.Logger} [logger] - Logger instance
* @return {undefined}
*/
Expand All @@ -443,7 +465,7 @@ class RESTClient {
query.targetLeader = targetLeader;
}
log.debug('getting raft log', { raftId, path, query });
this.request('GET', path, log, query, null, callback);
this.requestStreamed('GET', path, log, query, null, callback);
}
}

Expand Down
56 changes: 33 additions & 23 deletions tests/functional/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,32 +81,42 @@ describe('Bucket Client tests', function testClient() {
it('should get logs from bucketd', done => {
const start = 1;
const end = 10;
client.getRaftLog(0, 1, 10, true, null, (err, data) => {
client.getRaftLog(0, 1, 10, true, null, (err, stream) => {
if (err) {
return done(err);
}
const obj = JSON.parse(data);
assert.strictEqual(Object.prototype.hasOwnProperty.call(
obj, 'info'), true);
const info = obj.info;
assert.strictEqual(['start', 'end', 'cseq', 'prune']
.every(key => Object.prototype.hasOwnProperty.call(
obj.info, key)), true);
assert.strictEqual(info.start >= start, true);
assert.strictEqual(info.end <= end, true);
// NOTE: this check will be removed when pruned logs are
// retrieved also
assert.strictEqual(info.prune <= info.start, true);
assert.strictEqual(info.cseq >= info.end, true);
assert.strictEqual(Object.prototype.hasOwnProperty.call(
obj, 'log'), true);
const logs = obj.log;
assert.strictEqual(Array.isArray(logs), true);
assert.strictEqual(logs.length >= 1, true);
assert.strictEqual(logs.every(log =>
(typeof log === 'object') && (Object.keys(log).length > 0)
), true);
return done();
const dataBufs = [];
let dataLen = 0;
stream.on('data', data => {
dataBufs.push(data);
dataLen += data.length;
}).on('error', done)
.on('end', () => {
const data = Buffer.concat(dataBufs, dataLen).toString();
const obj = JSON.parse(data);
assert.strictEqual(Object.prototype.hasOwnProperty.call(
obj, 'info'), true);
const info = obj.info;
assert.strictEqual(['start', 'end', 'cseq', 'prune']
.every(key => Object.prototype.hasOwnProperty.call(
obj.info, key)), true);
assert.strictEqual(info.start >= start, true);
assert.strictEqual(info.end <= end, true);
// NOTE: this check will be removed when pruned logs are
// retrieved also
assert.strictEqual(info.prune <= info.start, true);
assert.strictEqual(info.cseq >= info.end, true);
assert.strictEqual(Object.prototype.hasOwnProperty.call(
obj, 'log'), true);
const logs = obj.log;
assert.strictEqual(Array.isArray(logs), true);
assert.strictEqual(logs.length >= 1, true);
assert.strictEqual(logs.every(log =>
(typeof log === 'object') && (Object.keys(log).length > 0)
), true);
return done();
});
return undefined;
});
});
});

0 comments on commit b0836dc

Please sign in to comment.