Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fwdport 7.2 to rel/7.4 (formerly master) #404

Merged
3 commits merged into from
Feb 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 20 additions & 12 deletions lib/storage/metadata/bucketclient/LogConsumer.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict'; // eslint-disable-line

const stream = require('stream');
const jsonStream = require('JSONStream');

const werelogs = require('werelogs');

Expand Down Expand Up @@ -89,7 +90,7 @@ class LogConsumer {

this.bucketClient.getRaftLog(
this.raftSession, _params.startSeq, _params.limit,
false, null, (err, data) => {
false, null, (err, stream) => {
if (err) {
if (err.code === 404) {
// no such raft session, log and ignore
Expand All @@ -109,18 +110,25 @@ class LogConsumer {
'Error handling record log request', { error: err });
return cb(err);
}
let logResponse;
try {
logResponse = JSON.parse(data);
} catch (err) {
this.logger.error('received malformed JSON',
{ params });
// setup a temporary listener until the 'header' event
// is emitted
recordStream.on('error', err => {
this.logger.error('error receiving raft log',
{ error: err.message });
return cb(errors.InternalError);
}
logResponse.log.forEach(entry => recordStream.write(entry));
recordStream.end();
return cb(null, { info: logResponse.info,
log: recordStream });
});
const jsonResponse = stream.pipe(jsonStream.parse('log.*'));
jsonResponse.pipe(recordStream);
stream.on('error', err => recordStream.emit('error', err));
jsonResponse
.on('header', header => {
// remove temporary listener
recordStream.removeAllListeners('error');
return cb(null, { info: header.info,
log: recordStream });
})
.on('error', err => recordStream.emit('error', err));
return undefined;
}, this.logger.newRequestLogger());
}
}
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"ioredis": "2.4.0",
"ipaddr.js": "1.2.0",
"joi": "^10.6",
"JSONStream": "^1.0.0",
"level": "~1.6.0",
"level-sublevel": "~6.6.1",
"node-forge": "^0.7.1",
Expand Down
55 changes: 52 additions & 3 deletions tests/unit/storage/metadata/bucketclient/LogConsumer.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
'use strict'; //eslint-disable-line

const assert = require('assert');
const stream = require('stream');
const errors = require('../../../../../lib/errors');

const LogConsumer = require(
'../../../../../lib/storage/metadata/bucketclient/LogConsumer.js');

/* eslint-disable max-len */
const mockedLogResponse = `{
const mockedLogResponseData = `{
"info": { "start": 10, "end": 11, "prune": 10, "cseq": 11 },
"log": [
{
Expand Down Expand Up @@ -41,12 +42,44 @@ const mockedLogResponse = `{
]
}`;

const malformedLogResponse = `{
const malformedLogResponseData = `{
"info": not json!,
"log": []
}`;

const malformedLogEntryData = `{
"info": { "start": 10, "end": 11, "prune": 10, "cseq": 11 },
"log": [
{
"db": "funbucket",
"entries": [
{
"key": "coolkey0",
"value": "42"
}
],
"method": 8
},
{{{
]
}`;
/* eslint-enable max-len */

class MockStream extends stream.Readable {
constructor(dataSource) {
super();
this._dataSource = dataSource;
}
_read() {
this.push(this._dataSource);
this.push(null);
}
}

const mockedLogResponse = new MockStream(mockedLogResponseData);
const malformedLogResponse = new MockStream(malformedLogResponseData);
const malformedLogEntry = new MockStream(malformedLogEntryData);

// mock a simple bucketclient to get a fake raft log
class BucketClientMock {

Expand All @@ -63,6 +96,9 @@ class BucketClientMock {
case 4:
return process.nextTick(() => callback(null,
malformedLogResponse));
case 5:
return process.nextTick(() => callback(null,
malformedLogEntry));
default:
assert.fail();
}
Expand Down Expand Up @@ -153,7 +189,7 @@ describe('raft record log client', () => {
done();
});
});
it('should not crash with malformed log response', done => {
it('should return error with malformed log response', done => {
const logClient = new LogConsumer({ bucketClient,
raftSession: 4 });
logClient.readRecords({}, err => {
Expand All @@ -162,5 +198,18 @@ describe('raft record log client', () => {
done();
});
});
it('should emit error event if a log entry is malformed', done => {
const logClient = new LogConsumer({ bucketClient,
raftSession: 5 });
logClient.readRecords({}, (err, res) => {
assert.ifError(err);
assert(res.info);
assert(res.log);
res.log.on('error', err => {
assert(err);
done();
});
});
});
});
});