Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bf: make getRaftLog() return a stream #126

Merged
merged 1 commit into from
Jan 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 41 additions & 19 deletions lib/RESTClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,38 @@ class RESTClient {
* @return {undefined}
*/
request(method, beginPath, log, params, data, callback) {
this.requestStreamed(
method, beginPath, log, params, data, (err, res) => {
if (err) {
return callback(err);
}
const ret = [];
let retLen = 0;
res.on('data', data => {
ret.push(data);
retLen += data.length;
}).on('error', callback)
.on('end', () => {
callback(null, Buffer.concat(ret, retLen).toString());
});
return undefined;
});
}

/**
* Streamed version of RESTClient.request() returning a byte
* stream object in callback
*
* @param {string} method - the HTTP method of the request
* @param {string} beginPath - formated path without parameters
* @param {object} log - logger
* @param {object} params - parameters of the request
* @param {string} data - data of the request
* @param {function} callback - callback
*
* @return {undefined}
*/
requestStreamed(method, beginPath, log, params, data, callback) {
log.debug('sending request', { httpMethod: method, beginPath });
let path = beginPath;
assert(typeof callback === 'function', 'callback must be a function');
Expand All @@ -355,9 +387,6 @@ class RESTClient {
options.key = this._key;
options.cert = this._cert;
}
const ret = [];
let retLen = 0;

// somehow options can get cyclical so fields need to be chosen
// for display
log.debug('direct request to MD endpoint', { httpMethod: method,
Expand All @@ -381,19 +410,12 @@ class RESTClient {
req.write(binData);
}

req.on('response', res => {
res.on('data', data => {
ret.push(data);
retLen += data.length;
}).on('error', callback).on('end', () => {
this.endRespond(res, Buffer.concat(ret, retLen).toString(),
log, callback);
});
}).on('error', error => {
// covers system errors like ECONNREFUSED, ECONNRESET etc.
log.error('error sending request to metadata', { error });
callback(errors.InternalError);
}).end();
req.on('response', res => this.endRespond(res, res, log, callback))
.on('error', error => {
// covers system errors like ECONNREFUSED, ECONNRESET etc.
log.error('error sending request to metadata', { error });
callback(errors.InternalError);
}).end();
}

/**
Expand All @@ -414,7 +436,7 @@ class RESTClient {

/**
* Get raft logs from bucketd
* get server's response and return it
* get server's response and return it as a stream
*
* @param {string} raftId - raft session id
* @param {number} [start=undefined] - starting sequence number. If it is
Expand All @@ -425,7 +447,7 @@ class RESTClient {
* @param {boolean} [targetLeader=undefined] - true: from leader instead of
* follower
* @param {string} reqUids - the identifier of the request
* @param {callback} callback - callback(err, info)
* @param {callback} callback - callback(err, stream)
* @param {werelogs.Logger} [logger] - Logger instance
* @return {undefined}
*/
Expand All @@ -443,7 +465,7 @@ class RESTClient {
query.targetLeader = targetLeader;
}
log.debug('getting raft log', { raftId, path, query });
this.request('GET', path, log, query, null, callback);
this.requestStreamed('GET', path, log, query, null, callback);
}
}

Expand Down
56 changes: 33 additions & 23 deletions tests/functional/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,32 +81,42 @@ describe('Bucket Client tests', function testClient() {
it('should get logs from bucketd', done => {
const start = 1;
const end = 10;
client.getRaftLog(0, 1, 10, true, null, (err, data) => {
client.getRaftLog(0, 1, 10, true, null, (err, stream) => {
if (err) {
return done(err);
}
const obj = JSON.parse(data);
assert.strictEqual(Object.prototype.hasOwnProperty.call(
obj, 'info'), true);
const info = obj.info;
assert.strictEqual(['start', 'end', 'cseq', 'prune']
.every(key => Object.prototype.hasOwnProperty.call(
obj.info, key)), true);
assert.strictEqual(info.start >= start, true);
assert.strictEqual(info.end <= end, true);
// NOTE: this check will be removed when pruned logs are
// retrieved also
assert.strictEqual(info.prune <= info.start, true);
assert.strictEqual(info.cseq >= info.end, true);
assert.strictEqual(Object.prototype.hasOwnProperty.call(
obj, 'log'), true);
const logs = obj.log;
assert.strictEqual(Array.isArray(logs), true);
assert.strictEqual(logs.length >= 1, true);
assert.strictEqual(logs.every(log =>
(typeof log === 'object') && (Object.keys(log).length > 0)
), true);
return done();
const dataBufs = [];
let dataLen = 0;
stream.on('data', data => {
dataBufs.push(data);
dataLen += data.length;
}).on('error', done)
.on('end', () => {
const data = Buffer.concat(dataBufs, dataLen).toString();
const obj = JSON.parse(data);
assert.strictEqual(Object.prototype.hasOwnProperty.call(
obj, 'info'), true);
const info = obj.info;
assert.strictEqual(['start', 'end', 'cseq', 'prune']
.every(key => Object.prototype.hasOwnProperty.call(
obj.info, key)), true);
assert.strictEqual(info.start >= start, true);
assert.strictEqual(info.end <= end, true);
// NOTE: this check will be removed when pruned logs are
// retrieved also
assert.strictEqual(info.prune <= info.start, true);
assert.strictEqual(info.cseq >= info.end, true);
assert.strictEqual(Object.prototype.hasOwnProperty.call(
obj, 'log'), true);
const logs = obj.log;
assert.strictEqual(Array.isArray(logs), true);
assert.strictEqual(logs.length >= 1, true);
assert.strictEqual(logs.every(log =>
(typeof log === 'object') && (Object.keys(log).length > 0)
), true);
return done();
});
return undefined;
});
});
});